This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new 5a18eb46 PROTON-2887 Ensure buffer release on exception 5a18eb46 is described below commit 5a18eb46af126e1b4a084a886c8dee6c9a79b85d Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Tue Apr 22 18:42:30 2025 -0400 PROTON-2887 Ensure buffer release on exception Some code cleanups in the frame parser to ensure the payload buffer is closed in case of exception to ensure release of a backing buffer that is being reference counted --- .../engine/impl/ProtonFrameDecodingHandler.java | 107 +++++++++++++-------- 1 file changed, 66 insertions(+), 41 deletions(-) diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java index 09f72836..58d4e013 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java @@ -62,6 +62,11 @@ public class ProtonFrameDecodingHandler implements EngineHandler, SaslPerformati */ public static final int FRAME_SIZE_BYTES = 4; + /** + * Size of the initial portion of an encoded from before any extended header portion. + */ + private static final int FRAME_PREAMBLE_BYTES = 8; + private final AMQPPerformativeEnvelopePool<IncomingAMQPEnvelope> framePool = AMQPPerformativeEnvelopePool.incomingEnvelopePool(); private Decoder decoder; @@ -339,63 +344,83 @@ public class ProtonFrameDecodingHandler implements EngineHandler, SaslPerformati @Override public void parse(EngineHandlerContext context, ProtonBuffer input) { - int dataOffset = (input.readByte() << 2) & 0x3FF; - int frameSize = length + FRAME_SIZE_BYTES; + final int dataOffset = (input.readByte() << 2) & 0x3FF; + final int frameSize = length + FRAME_SIZE_BYTES; + final int frameBodySize = frameSize - dataOffset; validateDataOffset(dataOffset, frameSize); - int type = input.readByte() & 0xFF; - short channel = input.readShort(); + final int type = input.readByte() & 0xFF; + final short channel = input.readShort(); - // Skip over the extended header if present (i.e offset > 8) - if (dataOffset != 8) { - input.advanceReadOffset(dataOffset - 8); + // Skip over the extended header if present (i.e offset > FRAME_PREAMBLE_SIZE) + if (dataOffset != FRAME_PREAMBLE_BYTES) { + input.advanceReadOffset(dataOffset - FRAME_PREAMBLE_BYTES); } - final int frameBodySize = frameSize - dataOffset; - - ProtonBuffer payload = null; - Object val = null; - if (frameBodySize > 0) { - int startReadIndex = input.getReadOffset(); - val = decoder.readObject(input, decoderState); - - // Copy the payload portion of the incoming bytes for now as the incoming may be - // from a wrapped pooled buffer and for now we have no way of retaining or otherwise - // ensuring that the buffer remains ours. Since we might want to store received - // data at a client level and decode later we could end up losing the data to reuse - // if it was pooled. - if (input.isReadable()) { - int payloadSize = frameBodySize - (input.getReadOffset() - startReadIndex); - if (payloadSize > 0) { - // Payload is now only a view of the bytes from the input that comprise it. - payload = input.copy(input.getReadOffset(), payloadSize, true); - input.advanceReadOffset(payloadSize); - } + switch (type) { + case AMQP_FRAME_TYPE: + handleAMQPPerformative(context, channel, frameBodySize, input); + break; + case SASL_FRAME_TYPE: + handleSASLPerformative(context, input); + break; + default: + throw new FrameDecodingException(String.format("unknown frame type: %d", type)); } } else { - transitionToFrameSizeParsingStage(); - context.fireRead(EmptyEnvelope.INSTANCE); - return; + handleEmptyFrame(context); } + } + + private void handleEmptyFrame(EngineHandlerContext context) { + transitionToFrameSizeParsingStage(); + context.fireRead(EmptyEnvelope.INSTANCE); + } + + private void handleAMQPPerformative(EngineHandlerContext context, short channel, int frameBodySize, ProtonBuffer input) { + final int startReadIndex = input.getReadOffset(); + final Performative performative = decoder.readObject(input, decoderState, Performative.class); + + // Copy the payload portion of the incoming bytes for now as the incoming may be from a + // wrapped pooled buffer and for now we have no way of retaining or otherwise ensuring + // that the buffer remains ours. Since we might want to store received data at a client + // level and decode later we could end up losing the data to reuse if it was pooled. + if (input.isReadable()) { + final ProtonBuffer payload; + final int payloadSize = frameBodySize - (input.getReadOffset() - startReadIndex); + + if (payloadSize > 0) { + // The payload buffer is now only a read-only view of the bytes from the input that comprise it. + payload = input.copy(input.getReadOffset(), payloadSize, true); + input.advanceReadOffset(payloadSize); + } else { + payload = null; + } - if (type == AMQP_FRAME_TYPE) { - Performative performative = (Performative) val; - IncomingAMQPEnvelope frame = framePool.take(performative, channel, payload); - transitionToFrameSizeParsingStage(); - context.fireRead(frame); - } else if (type == SASL_FRAME_TYPE) { - SaslPerformative performative = (SaslPerformative) val; - SASLEnvelope saslFrame = new SASLEnvelope(performative); transitionToFrameSizeParsingStage(); - // Ensure we process transition from SASL to AMQP header state - handleRead(context, saslFrame); + try { + context.fireRead(framePool.take(performative, channel, payload)); + } catch (Exception ex) { + if (payload != null) { + payload.close(); + } + } } else { - throw new FrameDecodingException(String.format("unknown frame type: %d", type)); + transitionToFrameSizeParsingStage(); + context.fireRead(framePool.take(performative, channel, null)); } } + private void handleSASLPerformative(EngineHandlerContext context, ProtonBuffer input) { + final SaslPerformative performative = (SaslPerformative) decoder.readObject(input, decoderState);; + + transitionToFrameSizeParsingStage(); + // Ensure we process transition from SASL to AMQP header state + handleRead(context, new SASLEnvelope(performative)); + } + private void validateDataOffset(int dataOffset, int frameSize) throws FrameDecodingException { if (dataOffset < 8) { throw new FrameDecodingException(String.format( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org