On 3/15/24 15:42, Ciaran wrote:
On Fri, Mar 15, 2024 at 6:14 PM Timothy Bish <tabish...@gmail.com> wrote:

On 3/15/24 11:50, Ciaran wrote:
Hi,

I'm attempting to build a resilient client using the protonj2 client and
as
part of my testing I was checking that it would handle messages with null
bodies (specifically my client is only willing to work with strings or
byte
arrays so it rejects any other AMQP message body type). However, when I
put
a null body on the message (i.e. body length of 0), when I de-reference
the
delivery's message's body property (e.g. delivery.message().body) I
receive
an error from the proton engine:
Can you capture a frame trace, I need enough information to try and
create an exact reproducer which can be aided with traces of the
incoming frames to reverse engineer a test. I wouldn't expect and
exception here and its likely something that was missed in the current
test suite.

Code snippets of the sender and receiver bits can help as well.


Hi,
Please find below everything I think you've asked for, the real code is
somewhat distributed
across several different classes, but this should be pretty equivalent:

RECEIVER CODE
-------------
ConnectionOptions options = constructBasicConnectionOptions();
options.reconnectOptions()
        .reconnectEnabled(true)
        .useReconnectBackOff(true)
        .reconnectDelay(5000)
        .maxReconnectDelay(10240001)
.maxReconnectAttempts(12)
.warnAfterReconnectAttempts(1);

Client client= Client.create();
Connection connection = client.connect("host", 1234, options);
ReceiverOptions ro = new ReceiverOptions();
ro.autoAccept(false);
Receiver receiver = connection.openReceiver( "queue", ro )) {

Delivery delivery = receiver.tryReceive();
if( delivery != null ) {
Message<Object> message= delivery.message();
// Exception occurs here.
Object body= message.body();
}

SENDER CODE
-----------
final Client client = Client.create();
final ConnectionOptions options = new ConnectionOptions();
options.sslOptions().sslEnabled(true);
options.user("MY_USER");
options.password("MY_PASSWORD");

Connection connection = client.connect(serverHost, serverPort, options);
Sender sender = connection.openSender(address);
sender.send(Message.create((byte[])null));

FRAME TRACE
------------
-> SASL:[269961536:0] AMQP,3,1,0,0
<- SASL:[269961536:0] AMQP,3,1,0,0
<- SASL:[269961536:0] SaslMechanisms{saslServerMechanisms=[MSSBCBS, PLAIN,
ANONYMOUS, EXTERNAL]}
19:36:21,633  WARN SaslMechanismSelector:130 - Caught exception while
trying to create SASL mechanism MSSBCBS: No Matching SASL Mechanism with
name: MSSBCBS
-> SASL:[269961536:0] SaslInit{mechanism=PLAIN,
initialResponse="\x00MY_USER\x00MY_PASSWORD"...(truncated), hostname='
MY_HOST.servicebus.windows.net'}
<- SASL:[269961536:0] SaslOutcome{code=OK, additionalData="Welcome!"}
-> AMQP:[269961536:0] AMQP,0,1,0,0
<- AMQP:[269961536:0] AMQP,0,1,0,0
-> AMQP:[269961536:0] Open{
containerId='ID:081fcabd-ac34-41b5-9bff-fe35d0cfbff1:1:2', hostname='
MY_HOST.servicebus.windows.net', maxFrameSize=65536, channelMax=65535,
idleTimeOut=60000, outgoingLocales=null, incomingLocales=null,
offeredCapabilities=null, desiredCapabilities=[ANONYMOUS-RELAY],
properties=null}
-> AMQP:[269961536:0] Begin{remoteChannel=null, nextOutgoingId=0,
incomingWindow=1600, outgoingWindow=2147483647, handleMax=null,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
-> AMQP:[269961536:0]
Attach{name='receiver-ID:081fcabd-ac34-41b5-9bff-fe35d0cfbff1:1:2:1:1',
handle=0, role=RECEIVER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
source=Source{address='testq', durable=NONE, expiryPolicy=LINK_DETACH,
timeout=0, dynamic=false, dynamicNodeProperties=null,
distributionMode=null, filter=null,
defaultOutcome=Modified{deliveryFailed=true, undeliverableHere=false,
messageAnnotations=null}, outcomes=[amqp:accepted:list, amqp:rejected:list,
amqp:released:list, amqp:modified:list], capabilities=null},
target=Target{address='testq', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null},
unsettled=null, incompleteUnsettled=null, initialDeliveryCount=null,
maxMessageSize=null, offeredCapabilities=null, desiredCapabilities=null,
properties=null}
-> AMQP:[269961536:0] Flow{nextIncomingId=null, incomingWindow=1600,
nextOutgoingId=0, outgoingWindow=2147483647, handle=0, deliveryCount=null,
linkCredit=10, available=null, drain=false, echo=null, properties=null}
<- AMQP:[269961536:0] Open{
containerId='af1bcb999146429ea0a245db6affd5e4_G60', hostname='null',
maxFrameSize=65536, channelMax=4999, idleTimeOut=120000,
outgoingLocales=null, incomingLocales=null, offeredCapabilities=null,
desiredCapabilities=null, properties=null}
<- AMQP:[269961536:0] Begin{remoteChannel=0, nextOutgoingId=1,
incomingWindow=5000, outgoingWindow=1600, handleMax=255,
offeredCapabilities=null, desiredCapabilities=null, properties=null}
<- AMQP:[269961536:0]
Attach{name='receiver-ID:081fcabd-ac34-41b5-9bff-fe35d0cfbff1:1:2:1:1',
handle=0, role=SENDER, sndSettleMode=UNSETTLED, rcvSettleMode=FIRST,
source=Source{address='testq', durable=NONE, expiryPolicy=LINK_DETACH,
timeout=0, dynamic=false, dynamicNodeProperties=null,
distributionMode=null, filter=null,
defaultOutcome=Modified{deliveryFailed=true, undeliverableHere=false,
messageAnnotations=null}, outcomes=[amqp:accepted:list, amqp:rejected:list,
amqp:released:list, amqp:modified:list], capabilities=null},
target=Target{address='testq', durable=NONE, expiryPolicy=SESSION_END,
timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null},
unsettled=null, incompleteUnsettled=null, initialDeliveryCount=0,
maxMessageSize=18446744073709551615, offeredCapabilities=[SHARED-SUBS],
desiredCapabilities=null, properties=null}
<- AMQP:[269961536:0] Transfer{handle=0, deliveryId=0,
deliveryTag=DeliveryTag: {[-57, -118, -50, -93, 16, -69, -2, 64, -67, -39,
124, -41, -79, 11, 124, 3]}, messageFormat=0, settled=null, more=false,
rcvSettleMode=null, state=null, resume=null, aborted=null, batchable=true}
- "\x00Sp\xc0\x0b\x05@@pH\x19\x08\x00@R
\x02\x00Sq\xc1$\x02\xa3\x10x"...(truncated)
-> AMQP:[269961536:0] Detach{handle=0, closed=true, error=null}
<- AMQP:[269961536:0] Detach{handle=0, closed=true, error=null}
-> AMQP:[269961536:0] Close{error=null}
<- AMQP:[269961536:0] Close{error=null}

Thank you.

I tried to reproduce it and can't which likely means I'd need to recreate the exact sequence of bytes the receiver handles to see if there is an issue in the compositing of the split frames as the exception shows a composite buffer in use which usually implies the message arrived in chunks.  A basic split write of a Transfer with message payload of a null in the AmqpValue didn't show any issues for me.

You can capture the IO level data by enabling the following in the ConnectionOptions:

   options.transportOptions().traceBytes(true);


--
Tim Bish


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to