Hi Robbie,
NOTE - Just now I subscribed to the user mailing list, so I haven't
received email earlier for your reply. I checked through mail archive.
Sorry in advance if it creates a new email thread.
Thanks for quick reply on my query. Qpid-JMS-Client widely used in our
organization, so we are trying to solve this issue with Qpid-JMS-Client
which will be easy to roll out.
I did testing with Microsoft SDK and was able to receive messages.
Microsoft SDK also uses "proton-j" library .
Microsoft SDK needs sessionId(JMXGroupId) to receive messages.
Please find attached
1. Sample Microsoft SDK code - ReceiveNamedSessionAsyncSample.txt -
a) referred from -
https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java
b) Used - com.azure:azure-messaging-servicebus:7.3.0
2. Sample logs( including proton logs) -
Received_Session_Enabled_Message_From_Msft_SDK.txt
Can you please cross check attached proton logs from Microsoft SDK? Please
suggest your opinion, if we can use Qpid-JMS-Client or not for this
purpose? Any hacking ? If not possible then we need to use Microsoft SDK.
Regards,
Abhishek Kumar
Mob - +44-7424034703
/**
*
*/
package com.sbus.test;
import com.azure.core.amqp.models.AmqpAnnotatedMessage;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient;
import com.azure.messaging.servicebus.ServiceBusSessionReceiverAsyncClient;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Demonstrates how to receive messages from a named session.
*/
public class ReceiveNamedSessionAsyncSample {
String connectionString = "**";
String queueName = "test-order";
/**
* Main method to invoke this demo on how to receive messages from a
session with id "greetings" in an Azure Service
* Bus Queue.
*
* @param args Unused arguments to the program.
*
* @throws InterruptedException If the program is unable to sleep while
waiting for the operations to complete.
*/
public static void main(String[] args) throws InterruptedException {
ReceiveNamedSessionAsyncSample sample = new
ReceiveNamedSessionAsyncSample();
sample.run();
}
/**
* This method to invoke this demo on how to receive messages from a
session with id "greetings" in an Azure Service
* Bus Queue.
*
* @throws InterruptedException If the program is unable to sleep while
waiting for the operations to complete.
*/
@Test
public void run() throws InterruptedException {
AtomicBoolean sampleSuccessful = new AtomicBoolean(true);
CountDownLatch countdownLatch = new CountDownLatch(1);
// The connection string value can be obtained by:
// 1. Going to your Service Bus namespace in Azure Portal.
// 2. Go to "Shared access policies"
// 3. Copy the connection string for the "RootManageSharedAccessKey"
policy.
// The 'connectionString' format is shown below.
// 1.
"Endpoint={fully-qualified-namespace};SharedAccessKeyName={policy-name};SharedAccessKey={key}"
// 2. "<<fully-qualified-namespace>>" will look similar to
"{your-namespace}.servicebus.windows.net"
// 3. "queueName" will be the name of the Service Bus queue instance
you created
// inside the Service Bus namespace.
// Create a receiver.
ServiceBusSessionReceiverAsyncClient sessionReceiver = new
ServiceBusClientBuilder()
.connectionString(connectionString)
.sessionReceiver()
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.queueName(queueName)
//.disableAutoComplete()
.buildAsyncClient();
// Receiving messages that have the sessionId "greetings-id" set. This
can be set via
// ServiceBusMessage.setMessageId(String) when sending a message.
// The Mono completes successfully when a lock on the session is
acquired, otherwise, it completes with an
// error.
Mono<ServiceBusReceiverAsyncClient> receiverMono =
sessionReceiver.acceptSession("greetings-id");
System.out.println("*** Start receiving message **");
// If the session is successfully accepted, begin receiving messages
from it.
// Flux.usingWhen is used to dispose of the receiver after consuming
messages completes.
Disposable subscription = Flux.usingWhen(receiverMono,
receiver -> receiver.receiveMessages(),
receiver -> Mono.fromRunnable(() -> receiver.close()))
.subscribe(message -> {
System.out.println("** SessinID **" + message.getSessionId());
System.out.println("** SequenceNumber **" +
message.getSequenceNumber());
System.out.println("** Body **" + message.getBody().toString());
// Process message.
//System.out.printf("***** Session: %s. Sequence #: %s.
Contents: %s%n", message.getSessionId(),
// message.getSequenceNumber(), message.getBody());
// When this message function completes, the message is
automatically completed. If an exception is
// thrown in here, the message is abandoned.
// To disable this behaviour, toggle
ServiceBusSessionReceiverClientBuilder.disableAutoComplete()
// when building the session receiver.
}, error -> {
System.err.println("Error occurred: " + error);
sampleSuccessful.set(false);
});
System.err.println("*** Finish receiving messages **");
// Subscribe is not a blocking call so we wait here so the program does
not end.
countdownLatch.await(10, TimeUnit.SECONDS);
// Disposing of the subscription will cancel the receive() operation.
subscription.dispose();
// Close the receiver.
sessionReceiver.close();
// This assertion is to ensure that samples are working. Users should
remove this.
Assertions.assertTrue(sampleSuccessful.get());
}
}
INFO 2021-07-28 17:06:49,422 [main]
com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor:
namespace[] entityPath[****.servicebus.windows.net]: Setting next AMQP channel.
INFO 2021-07-28 17:06:49,424 [main]
com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor:
namespace[] entityPath[****.servicebus.windows.net]: Next AMQP channel
received, updating 0 current subscribers
INFO 2021-07-28 17:06:49,429 [main]
com.azure.messaging.servicebus.ServiceBusClientBuilder: # of open clients with
shared connection: 1
*** Start receiving message **
INFO 2021-07-28 17:06:49,492 [main]
com.azure.core.amqp.implementation.ReactorConnection:
connectionId[MF_e00897_1627488409363]: Creating and starting connection to
****.servicebus.windows.net:5671
INFO 2021-07-28 17:06:49,528 [main]
com.azure.core.amqp.implementation.ReactorExecutor:
connectionId[MF_e00897_1627488409363], message[Starting reactor.]
INFO 2021-07-28 17:06:49,537 [reactor-executor-1]
com.azure.core.amqp.implementation.handler.ConnectionHandler: onConnectionInit
connectionId[MF_e00897_1627488409363] hostname[****.servicebus.windows.net]
amqpHostname[****.servicebus.windows.net]
*** Finish receiving messages **
INFO 2021-07-28 17:06:49,539 [reactor-executor-1]
com.azure.core.amqp.implementation.handler.ReactorHandler:
connectionId[MF_e00897_1627488409363] reactor.onReactorInit
INFO 2021-07-28 17:06:49,539 [reactor-executor-1]
com.azure.core.amqp.implementation.handler.ConnectionHandler:
onConnectionLocalOpen connectionId[MF_e00897_1627488409363]
hostname[****.servicebus.windows.net] errorCondition[null]
errorDescription[null]
INFO 2021-07-28 17:06:49,632 [reactor-executor-1]
com.azure.core.amqp.implementation.handler.ConnectionHandler: onConnectionBound
connectionId[MF_e00897_1627488409363] hostname[****.servicebus.windows.net]
peerDetails[****.servicebus.windows.net:5671]
INFO 2021-07-28 17:06:50,208 [reactor-executor-1]
com.azure.core.amqp.implementation.handler.StrictTlsContextSpi: SSLv2Hello was
an enabled protocol. Filtering out.
[1576319144:0] -> SASL
[1576319144:0] -> SaslInit{mechanism=ANONYMOUS, initialResponse=null,
hostname='null'}
[1576319144:0] <- SASL
[1576319144:0] <- SaslMechanisms{saslServerMechanisms=[MSSBCBS, PLAIN,
ANONYMOUS, EXTERNAL]}
[1576319144:0] <- SaslOutcome{_code=OK, _additionalData=Welcome!}
[1576319144:0] -> AMQP
[1576319144:0] -> Open{ containerId='MF_e00897_1627488409363',
hostname='****.servicebus.windows.net', maxFrameSize=65536, channelMax=65535,
idleTimeOut=30000, outgoingLocales=null, incomingLocales=null,
offeredCapabilities=null, desiredCapabilities=null,
properties={product=azure-messaging-servicebus,
framework=jre:1.8.0_242;vendor:AdoptOpenJDK;jvm25.242-b08, version=7.3.0,
platform=Windows 10 10.0,
user-agent=azsdk-java-azure-messaging-servicebus/7.3.0 (1.8.0_242; Windows 10;
10.0)}}
[1576319144:0] <- AMQP
[1576319144:0] <- Open{ containerId='1748f2a9ec1b4f6787a45fe25b257dc1_G40',
hostname='null', maxFrameSize=65536, channelMax=4999, idleTimeOut=120000,
outgoingLocales=null, incomingLocales=null, offeredCapabilities=null,
desiredCapabilities=null, properties=null}
INFO 2021-07-28 17:06:50,409 [reactor-executor-1]
com.azure.core.amqp.implementation.handler.ConnectionHandler:
onConnectionRemoteOpen hostname[****.servicebus.windows.net],
connectionId[MF_e00897_1627488409363],
remoteContainer[1748f2a9ec1b4f6787a45fe25b257dc1_G40]
INFO 2021-07-28 17:06:50,409 [reactor-executor-1]
com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor:
namespace[] entityPath[****.servicebus.windows.net]: Channel is now active.
[1576319144:0] -> Begin{remoteChannel=null, nextOutgoingId=1,
incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1576319144:0] <- Begin{remoteChannel=0, nextOutgoingId=1, incomingWindow=5000,
outgoingWindow=2147483647, handleMax=255, offeredCapabilities=null,
desiredCapabilities=null, properties=null}
INFO 2021-07-28 17:06:50,453 [reactor-executor-1]
com.azure.core.amqp.implementation.handler.SessionHandler: onSessionRemoteOpen
connectionId[MF_e00897_1627488409363], entityName[test-order],
sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
INFO 2021-07-28 17:06:50,480 [reactor-executor-1]
com.azure.core.amqp.implementation.ReactorConnection: Setting CBS channel.
[1576319144:1] -> Begin{remoteChannel=null, nextOutgoingId=1,
incomingWindow=2147483647, outgoingWindow=2147483647, handleMax=65535,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
[1576319144:1] <- Begin{remoteChannel=1, nextOutgoingId=1, incomingWindow=5000,
outgoingWindow=2147483647, handleMax=255, offeredCapabilities=null,
desiredCapabilities=null, properties=null}
INFO 2021-07-28 17:06:50,508 [reactor-executor-1]
com.azure.core.amqp.implementation.handler.SessionHandler: onSessionRemoteOpen
connectionId[MF_e00897_1627488409363], entityName[cbs-session],
sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
INFO 2021-07-28 17:06:50,524 [reactor-executor-1]
com.azure.core.amqp.implementation.ReactorConnection:
connectionId[MF_e00897_1627488409363] entityPath[$cbs] linkName[cbs] Emitting
new response channel.
INFO 2021-07-28 17:06:50,524 [reactor-executor-1] class
com.azure.core.amqp.implementation.RequestResponseChannel:$cbs:
namespace[MF_e00897_1627488409363] entityPath[$cbs]: Setting next AMQP channel.
INFO 2021-07-28 17:06:50,524 [reactor-executor-1] class
com.azure.core.amqp.implementation.RequestResponseChannel:$cbs:
namespace[MF_e00897_1627488409363] entityPath[$cbs]: Next AMQP channel
received, updating 1 current subscribers
[1576319144:1] -> Attach{name='cbs:sender', handle=0, role=SENDER,
sndSettleMode=SETTLED, rcvSettleMode=FIRST, source=Source{address='null',
durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false,
dynamicNodeProperties=null, distributionMode=null, filter=null,
defaultOutcome=null, outcomes=null, capabilities=null},
target=Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null},
unsettled=null, incompleteUnsettled=false, initialDeliveryCount=0,
maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null,
properties=null}
[1576319144:1] -> Attach{name='cbs:receiver', handle=1, role=RECEIVER,
sndSettleMode=SETTLED, rcvSettleMode=FIRST, source=Source{address='$cbs',
durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false,
dynamicNodeProperties=null, distributionMode=null, filter=null,
defaultOutcome=null, outcomes=null, capabilities=null},
target=Target{address='cbs-client-reply-to', durable=NONE,
expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null,
capabilities=null}, unsettled=null, incompleteUnsettled=false,
initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null,
desiredCapabilities=null, properties=null}
[1576319144:1] <- Attach{name='cbs:sender', handle=0, role=RECEIVER,
sndSettleMode=SETTLED, rcvSettleMode=FIRST, source=Source{address='null',
durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false,
dynamicNodeProperties=null, distributionMode=null, filter=null,
defaultOutcome=null, outcomes=null, capabilities=null},
target=Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null},
unsettled=null, incompleteUnsettled=false, initialDeliveryCount=null,
maxMessageSize=18446744073709551615, offeredCapabilities=null,
desiredCapabilities=null, properties=null}
[1576319144:1] <- Flow{nextIncomingId=1, incomingWindow=5000, nextOutgoingId=1,
outgoingWindow=2147483647, handle=0, deliveryCount=0, linkCredit=100,
available=0, drain=false, echo=false, properties=null}
[1576319144:1] <- Attach{name='cbs:receiver', handle=1, role=SENDER,
sndSettleMode=SETTLED, rcvSettleMode=FIRST, source=Source{address='$cbs',
durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false,
dynamicNodeProperties=null, distributionMode=null, filter=null,
defaultOutcome=null, outcomes=null, capabilities=null},
target=Target{address='cbs-client-reply-to', durable=NONE,
expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null,
capabilities=null}, unsettled=null, incompleteUnsettled=false,
initialDeliveryCount=0, maxMessageSize=18446744073709551615,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
INFO 2021-07-28 17:06:50,555 [reactor-executor-1]
com.azure.core.amqp.implementation.handler.SendLinkHandler: onLinkRemoteOpen
connectionId[MF_e00897_1627488409363], entityPath[$cbs], linkName[cbs:sender],
remoteTarget[Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
INFO 2021-07-28 17:06:50,557 [reactor-executor-1]
com.azure.core.amqp.implementation.handler.ReceiveLinkHandler: onLinkRemoteOpen
connectionId[MF_e00897_1627488409363], entityPath[$cbs],
linkName[cbs:receiver], remoteSource[Source{address='$cbs', durable=NONE,
expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null,
distributionMode=null, filter=null, defaultOutcome=null, outcomes=null,
capabilities=null}]
INFO 2021-07-28 17:06:50,557 [reactor-executor-1] class
com.azure.core.amqp.implementation.RequestResponseChannel:$cbs:
namespace[MF_e00897_1627488409363] entityPath[$cbs]: Channel is now active.
[1576319144:1] -> Flow{nextIncomingId=1, incomingWindow=2147483647,
nextOutgoingId=1, outgoingWindow=2147483647, handle=1, deliveryCount=0,
linkCredit=1, available=null, drain=false, echo=false, properties=null}
[1576319144:1] -> Transfer{handle=0, deliveryId=0,
deliveryTag=952fc94ba10148778a253bd1503082d8, messageFormat=0, settled=true,
more=false, rcvSettleMode=null, state=null, resume=false, aborted=false,
batchable=false} (405)
"\x00Ss\xd0\x00\x00\x00\x1e\x00\x00\x00\x05S\x01@@@\xa1\x13cbs-client-reply-to\x00St\xc1\xa0\x08\xa1\x04name\xa1Eamqp://****.servicebus.windows.net/test-order\xa1\x0aexpiration\x83\x00\x00\x01z\xed\xf0\xab\x1f\xa1\x04type\xa1\x1fservicebus.windows.net:sastoken\xa1\x09operation\xa1\x09put-token\x00Sw\xa1\xc5SharedAccessSignature
sr=amqp%3A%2F%2F****.servicebus.windows.net%2Ftest-order&sig=MhLwCMWXckuAhscdCfxcvcSar1EBDtXWWVydu1EsfnE%3D&se=1627489610&skn=****"
[1576319144:1] <- Transfer{handle=1, deliveryId=0,
deliveryTag=\x01\x00\x00\x00, messageFormat=0, settled=true, more=false,
rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false}
(74)
"\x00Ss\xc0\x0f\x0d@@@@@S\x01@@@@@@@\x00St\xc11\x04\xa1\x0bstatus-codeq\x00\x00\x00\xca\xa1\x12status-description\xa1\x08Accepted"
INFO 2021-07-28 17:06:50,613 [reactor-executor-1]
com.azure.core.amqp.implementation.ActiveClientTokenManager: Scheduling refresh
token task. scopes[amqp://****.servicebus.windows.net/test-order]
INFO 2021-07-28 17:06:50,635 [reactor-executor-1]
com.azure.core.amqp.implementation.ReactorSession:
connectionId[MF_e00897_1627488409363] sessionId[test-order] linkName[11]
Creating a new receiver link.
[1576319144:0] -> Attach{name='11', handle=0, role=RECEIVER,
sndSettleMode=UNSETTLED, rcvSettleMode=SECOND,
source=Source{address='test-order', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null,
filter={com.microsoft:session-filter=11}, defaultOutcome=null, outcomes=null,
capabilities=null}, target=Target{address='null', durable=NONE,
expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null,
capabilities=null}, unsettled=null, incompleteUnsettled=false,
initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null,
desiredCapabilities=null, properties={com.microsoft:timeout=59000,
com.microsoft:entity-type=0}}
[1576319144:0] <- Attach{name='11', handle=0, role=SENDER,
sndSettleMode=UNSETTLED, rcvSettleMode=SECOND,
source=Source{address='test-order', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null,
filter={com.microsoft:session-filter=11}, defaultOutcome=null, outcomes=null,
capabilities=null}, target=Target{address='null', durable=NONE,
expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null,
capabilities=null}, unsettled=null, incompleteUnsettled=false,
initialDeliveryCount=0, maxMessageSize=18446744073709551615,
offeredCapabilities=[SHARED-SUBS], desiredCapabilities=null,
properties={com.microsoft:timeout=58980, com.microsoft:entity-type=0,
client-id=MF_e00897_1627488409363,
com.microsoft:locked-until-utc=637630852406531084}}
INFO 2021-07-28 17:06:50,671 [reactor-executor-1]
com.azure.core.amqp.implementation.handler.ReceiveLinkHandler: onLinkRemoteOpen
connectionId[MF_e00897_1627488409363], entityPath[test-order], linkName[11],
remoteSource[Source{address='test-order', durable=NONE,
expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null,
distributionMode=null, filter={com.microsoft:session-filter=11},
defaultOutcome=null, outcomes=null, capabilities=null}]
2021-07-28 17:06:50,719 Log4j2-TF-1-AsyncLoggerConfig-1 ERROR An exception
occurred processing Appender Console java.lang.UnsupportedOperationException:
Although QueueSubscription extends Queue it is purely internal and only
guarantees support for poll/clear/size/isEmpty. Instances shouldn't be
used/exposed as Queue outside of Reactor operators.
at reactor.core.Fuseable$QueueSubscription.iterator(Fuseable.java:145)
at
org.apache.logging.log4j.message.ParameterFormatter.appendCollection(ParameterFormatter.java:599)
at
org.apache.logging.log4j.message.ParameterFormatter.appendPotentiallyRecursiveValue(ParameterFormatter.java:507)
at
org.apache.logging.log4j.message.ParameterFormatter.recursiveDeepToString(ParameterFormatter.java:432)
at
org.apache.logging.log4j.message.ParameterFormatter.formatMessage2(ParameterFormatter.java:189)
at
org.apache.logging.log4j.message.ParameterizedMessage.formatTo(ParameterizedMessage.java:225)
at
org.apache.logging.log4j.core.pattern.MessagePatternConverter.format(MessagePatternConverter.java:119)
at
org.apache.logging.log4j.core.pattern.PatternFormatter.format(PatternFormatter.java:38)
at
org.apache.logging.log4j.core.layout.PatternLayout$PatternSerializer.toSerializable(PatternLayout.java:333)
at
org.apache.logging.log4j.core.layout.PatternLayout.toText(PatternLayout.java:232)
at
org.apache.logging.log4j.core.layout.PatternLayout.encode(PatternLayout.java:217)
at
org.apache.logging.log4j.core.layout.PatternLayout.encode(PatternLayout.java:57)
at
org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.directEncodeEvent(AbstractOutputStreamAppender.java:177)
at
org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.tryAppend(AbstractOutputStreamAppender.java:170)
at
org.apache.logging.log4j.core.appender.AbstractOutputStreamAppender.append(AbstractOutputStreamAppender.java:161)
at
org.apache.logging.log4j.core.config.AppenderControl.tryCallAppender(AppenderControl.java:156)
at
org.apache.logging.log4j.core.config.AppenderControl.callAppender0(AppenderControl.java:129)
at
org.apache.logging.log4j.core.config.AppenderControl.callAppenderPreventRecursion(AppenderControl.java:120)
at
org.apache.logging.log4j.core.config.AppenderControl.callAppender(AppenderControl.java:84)
at
org.apache.logging.log4j.core.config.LoggerConfig.callAppenders(LoggerConfig.java:448)
at
org.apache.logging.log4j.core.async.AsyncLoggerConfig.asyncCallAppenders(AsyncLoggerConfig.java:115)
at
org.apache.logging.log4j.core.async.AsyncLoggerConfigDisruptor$Log4jEventWrapperHandler.onEvent(AsyncLoggerConfigDisruptor.java:112)
at
org.apache.logging.log4j.core.async.AsyncLoggerConfigDisruptor$Log4jEventWrapperHandler.onEvent(AsyncLoggerConfigDisruptor.java:98)
at
com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
INFO 2021-07-28 17:06:50,722 [reactor-executor-1]
com.azure.core.amqp.implementation.ReactorSession: linkName[11]
entityPath[test-order] Returning existing receive link.
[1576319144:0] -> Flow{nextIncomingId=1, incomingWindow=2147483647,
nextOutgoingId=1, outgoingWindow=2147483647, handle=0, deliveryCount=0,
linkCredit=1, available=null, drain=false, echo=false, properties=null}
[1576319144:0] <- Transfer{handle=0, deliveryId=0,
deliveryTag=\xb1\xe8\xb6\xd8\x9e\xf4\x9bM\x8eJ\xd3\x07\xad2\xa6$,
messageFormat=0, settled=null, more=false, rcvSettleMode=null, state=null,
resume=false, aborted=false, batchable=true} (356)
"\x00Sp\xc0\x0a\x05A@pH\x19\x08\x00@C\x00Sq\xc1$\x02\xa3\x10x-opt-lock-token\x98\xd8\xb6\xe8\xb1\xf4\x9eM\x9b\x8eJ\xd3\x07\xad2\xa6$\x00Sr\xc1\x9d\x0c\xa3\x0ex-opt-jms-destQ\x00\xa3\x12x-opt-jms-msg-typeQ\x03\xa3\x13x-opt-enqueued-time\x83\x00\x00\x01z\xed\xd6z(\xa3\x15x-opt-sequence-number\x81\x00\x00\x00\x00\x00\x00\x03\x87\xa3\x13x-opt-partition-key\xa1\x0211\xa3\x12x-opt-locked-until\x83\x00\x00\xe6w\xd2\x1f\xdc\x00\x00Ss\xc0u\x0d\xa1/ID:625d06cd-e2f9-4982-820e-ee471d9bd4a6:1:1:1-1@\xa1\x0atest-order@@@\xa3\x18application/octet-stream@\x83\x00\x00\x01{5\xef\x82!\x83\x00\x00\x01z\xed\xd6z!\xa1\x0211@@\x00Su\xa0\x0bHello
World"
** SessinID **11
** SequenceNumber **903
** Body **Hello World
[1576319144:0] -> Disposition{role=RECEIVER, first=0, last=0, settled=false,
state=Accepted{}, batchable=false}
[1576319144:0] <- Disposition{role=SENDER, first=0, last=null, settled=true,
state=Accepted{}, batchable=false}
[1576319144:0] -> Flow{nextIncomingId=2, incomingWindow=2147483647,
nextOutgoingId=1, outgoingWindow=2147483647, handle=0, deliveryCount=1,
linkCredit=1, available=null, drain=false, echo=false, properties=null}
[1576319144:0] <- Transfer{handle=0, deliveryId=1,
deliveryTag=\xc6"\xca\x1e)\xc5\x89B\xb1\xd3w\x9a\x04>#\xb8, messageFormat=0,
settled=null, more=false, rcvSettleMode=null, state=null, resume=false,
aborted=false, batchable=true} (356)
"\x00Sp\xc0\x0a\x05A@pH\x19\x08\x00@C\x00Sq\xc1$\x02\xa3\x10x-opt-lock-token\x98\x1e\xca"\xc6\xc5)B\x89\xb1\xd3w\x9a\x04>#\xb8\x00Sr\xc1\x9d\x0c\xa3\x0ex-opt-jms-destQ\x00\xa3\x12x-opt-jms-msg-typeQ\x03\xa3\x13x-opt-enqueued-time\x83\x00\x00\x01z\xed\xd6{Q\xa3\x15x-opt-sequence-number\x81\x00\x00\x00\x00\x00\x00\x03\x88\xa3\x13x-opt-partition-key\xa1\x0211\xa3\x12x-opt-locked-until\x83\x00\x00\xe6w\xd2\x1f\xdc\x00\x00Ss\xc0u\x0d\xa1/ID:54b29b8b-77c3-4cc8-8e6c-af1ff531ad19:1:1:1-1@\xa1\x0atest-order@@@\xa3\x18application/octet-stream@\x83\x00\x00\x01{5\xef\x83S\x83\x00\x00\x01z\xed\xd6{S\xa1\x0211@@\x00Su\xa0\x0bHello
World"
** SessinID **11
** SequenceNumber **904
** Body **Hello World
[1576319144:0] -> Disposition{role=RECEIVER, first=1, last=1, settled=false,
state=Accepted{}, batchable=false}
[1576319144:0] <- Disposition{role=SENDER, first=1, last=null, settled=true,
state=Accepted{}, batchable=false}
[1576319144:0] -> Flow{nextIncomingId=3, incomingWindow=2147483647,
nextOutgoingId=1, outgoingWindow=2147483647, handle=0, deliveryCount=2,
linkCredit=1, available=null, drain=false, echo=false, properties=null}
[1576319144:0] -> Detach{handle=0, closed=true, error=null}
INFO 2021-07-28 17:06:59,547 [main]
com.azure.messaging.servicebus.ServiceBusReceiverAsyncClient: Removing receiver
links.
INFO 2021-07-28 17:06:59,548 [main]
com.azure.messaging.servicebus.ServiceBusClientBuilder: Closing a dependent
client. # of open clients: 0
INFO 2021-07-28 17:06:59,549 [main]
com.azure.messaging.servicebus.ServiceBusClientBuilder: No more open clients,
closing shared connection [ServiceBusConnectionProcessor].
INFO 2021-07-28 17:06:59,549 [main]
com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor:
Upstream connection publisher was completed. Terminating processor.
INFO 2021-07-28 17:06:59,549 [main]
com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor:
namespace[] entityPath[****.servicebus.windows.net]: AMQP channel processor
completed. Notifying 0 subscribers.
INFO 2021-07-28 17:06:59,549 [main]
com.azure.core.amqp.implementation.ReactorConnection:
connectionId[MF_e00897_1627488409363] signal[Disposed by client.,
isTransient[false], initiatedByClient[true]]: Disposing of ReactorConnection.
INFO 2021-07-28 17:06:59,549 [main]
com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor:
namespace[] entityPath[****.servicebus.windows.net]: Channel is disposed.
[1576319144:0] -> End{error=null}
[1576319144:1] -> End{error=null}
[1576319144:0] <- Detach{handle=0, closed=true, error=null}
INFO 2021-07-28 17:06:59,618 [reactor-executor-1]
com.azure.core.amqp.implementation.handler.ReceiveLinkHandler:
onLinkRemoteClose connectionId[MF_e00897_1627488409363] linkName[11],
errorCondition[null] errorDescription[null]
INFO 2021-07-28 17:06:59,621 [reactor-executor-1]
com.azure.core.amqp.implementation.handler.ReceiveLinkHandler: onLinkFinal
connectionId[MF_e00897_1627488409363], linkName[11]
[1576319144:0] <- End{error=null}
INFO 2021-07-28 17:06:59,632 [reactor-executor-1]
com.azure.core.amqp.implementation.handler.SessionHandler: onSessionRemoteClose
connectionId[test-order], entityName[MF_e00897_1627488409363],
condition[Error{condition=null, description='null', info=null}]
[1576319144:1] <- End{error=null}
INFO 2021-07-28 17:06:59,633 [reactor-executor-1]
com.azure.core.amqp.implementation.handler.SessionHandler: onSessionRemoteClose
connectionId[cbs-session], entityName[MF_e00897_1627488409363],
condition[Error{condition=null, description='null', info=null}]
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org