This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a18eb46 PROTON-2887 Ensure buffer release on exception
5a18eb46 is described below

commit 5a18eb46af126e1b4a084a886c8dee6c9a79b85d
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Tue Apr 22 18:42:30 2025 -0400

    PROTON-2887 Ensure buffer release on exception
    
    Some code cleanups in the frame parser to ensure the payload buffer is
    closed in case of exception to ensure release of a backing buffer that
    is being reference counted
---
 .../engine/impl/ProtonFrameDecodingHandler.java    | 107 +++++++++++++--------
 1 file changed, 66 insertions(+), 41 deletions(-)

diff --git 
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java
 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java
index 09f72836..58d4e013 100644
--- 
a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java
+++ 
b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java
@@ -62,6 +62,11 @@ public class ProtonFrameDecodingHandler implements 
EngineHandler, SaslPerformati
      */
     public static final int FRAME_SIZE_BYTES = 4;
 
+    /**
+     * Size of the initial portion of an encoded from before any extended 
header portion.
+     */
+    private static final int FRAME_PREAMBLE_BYTES = 8;
+
     private final AMQPPerformativeEnvelopePool<IncomingAMQPEnvelope> framePool 
= AMQPPerformativeEnvelopePool.incomingEnvelopePool();
 
     private Decoder decoder;
@@ -339,63 +344,83 @@ public class ProtonFrameDecodingHandler implements 
EngineHandler, SaslPerformati
 
         @Override
         public void parse(EngineHandlerContext context, ProtonBuffer input) {
-            int dataOffset = (input.readByte() << 2) & 0x3FF;
-            int frameSize = length + FRAME_SIZE_BYTES;
+            final int dataOffset = (input.readByte() << 2) & 0x3FF;
+            final int frameSize = length + FRAME_SIZE_BYTES;
+            final int frameBodySize = frameSize - dataOffset;
 
             validateDataOffset(dataOffset, frameSize);
 
-            int type = input.readByte() & 0xFF;
-            short channel = input.readShort();
+            final int type = input.readByte() & 0xFF;
+            final short channel = input.readShort();
 
-            // Skip over the extended header if present (i.e offset > 8)
-            if (dataOffset != 8) {
-                input.advanceReadOffset(dataOffset - 8);
+            // Skip over the extended header if present (i.e offset > 
FRAME_PREAMBLE_SIZE)
+            if (dataOffset != FRAME_PREAMBLE_BYTES) {
+                input.advanceReadOffset(dataOffset - FRAME_PREAMBLE_BYTES);
             }
 
-            final int frameBodySize = frameSize - dataOffset;
-
-            ProtonBuffer payload = null;
-            Object val = null;
-
             if (frameBodySize > 0) {
-                int startReadIndex = input.getReadOffset();
-                val = decoder.readObject(input, decoderState);
-
-                // Copy the payload portion of the incoming bytes for now as 
the incoming may be
-                // from a wrapped pooled buffer and for now we have no way of 
retaining or otherwise
-                // ensuring that the buffer remains ours.  Since we might want 
to store received
-                // data at a client level and decode later we could end up 
losing the data to reuse
-                // if it was pooled.
-                if (input.isReadable()) {
-                    int payloadSize = frameBodySize - (input.getReadOffset() - 
startReadIndex);
-                    if (payloadSize > 0) {
-                        // Payload is now only a view of the bytes from the 
input that comprise it.
-                        payload = input.copy(input.getReadOffset(), 
payloadSize, true);
-                        input.advanceReadOffset(payloadSize);
-                    }
+                switch (type) {
+                    case AMQP_FRAME_TYPE:
+                        handleAMQPPerformative(context, channel, 
frameBodySize, input);
+                        break;
+                    case SASL_FRAME_TYPE:
+                        handleSASLPerformative(context, input);
+                        break;
+                    default:
+                        throw new 
FrameDecodingException(String.format("unknown frame type: %d", type));
                 }
             } else {
-                transitionToFrameSizeParsingStage();
-                context.fireRead(EmptyEnvelope.INSTANCE);
-                return;
+                handleEmptyFrame(context);
             }
+        }
+
+        private void handleEmptyFrame(EngineHandlerContext context) {
+            transitionToFrameSizeParsingStage();
+            context.fireRead(EmptyEnvelope.INSTANCE);
+        }
+
+        private void handleAMQPPerformative(EngineHandlerContext context, 
short channel, int frameBodySize, ProtonBuffer input) {
+            final int startReadIndex = input.getReadOffset();
+            final Performative performative = decoder.readObject(input, 
decoderState, Performative.class);
+
+            // Copy the payload portion of the incoming bytes for now as the 
incoming may be from a
+            // wrapped pooled buffer and for now we have no way of retaining 
or otherwise ensuring
+            // that the buffer remains ours. Since we might want to store 
received data at a client
+            // level and decode later we could end up losing the data to reuse 
if it was pooled.
+            if (input.isReadable()) {
+                final ProtonBuffer payload;
+                final int payloadSize = frameBodySize - (input.getReadOffset() 
- startReadIndex);
+
+                if (payloadSize > 0) {
+                    // The payload buffer is now only a read-only view of the 
bytes from the input that comprise it.
+                    payload = input.copy(input.getReadOffset(), payloadSize, 
true);
+                    input.advanceReadOffset(payloadSize);
+                } else {
+                    payload = null;
+                }
 
-            if (type == AMQP_FRAME_TYPE) {
-                Performative performative = (Performative) val;
-                IncomingAMQPEnvelope frame = framePool.take(performative, 
channel, payload);
-                transitionToFrameSizeParsingStage();
-                context.fireRead(frame);
-            } else if (type == SASL_FRAME_TYPE) {
-                SaslPerformative performative = (SaslPerformative) val;
-                SASLEnvelope saslFrame = new SASLEnvelope(performative);
                 transitionToFrameSizeParsingStage();
-                // Ensure we process transition from SASL to AMQP header state
-                handleRead(context, saslFrame);
+                try {
+                    context.fireRead(framePool.take(performative, channel, 
payload));
+                } catch (Exception ex) {
+                    if (payload != null) {
+                        payload.close();
+                    }
+                }
             } else {
-                throw new FrameDecodingException(String.format("unknown frame 
type: %d", type));
+                transitionToFrameSizeParsingStage();
+                context.fireRead(framePool.take(performative, channel, null));
             }
         }
 
+        private void handleSASLPerformative(EngineHandlerContext context, 
ProtonBuffer input) {
+            final SaslPerformative performative = (SaslPerformative) 
decoder.readObject(input, decoderState);;
+
+            transitionToFrameSizeParsingStage();
+            // Ensure we process transition from SASL to AMQP header state
+            handleRead(context, new SASLEnvelope(performative));
+        }
+
         private void validateDataOffset(int dataOffset, int frameSize) throws 
FrameDecodingException {
             if (dataOffset < 8) {
                 throw new FrameDecodingException(String.format(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to