This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton-j.git
The following commit(s) were added to refs/heads/main by this push: new c7f3311 PROTON-2416: frame writer should reset its position on failure c7f3311 is described below commit c7f3311426d787fecdbb697f656fc76401997544 Author: Robbie Gemmell <rob...@apache.org> AuthorDate: Mon Aug 23 16:33:37 2021 +0100 PROTON-2416: frame writer should reset its position on failure --- .../qpid/proton/engine/impl/FrameWriter.java | 34 ++++++++++--------- .../qpid/proton/engine/impl/FrameWriterTest.java | 38 ++++++++++++++++++++++ 2 files changed, 57 insertions(+), 15 deletions(-) diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java index 4c037ac..2ec13c1 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameWriter.java @@ -99,25 +99,29 @@ class FrameWriter { void writeFrame(int channel, Object frameBody, ReadableBuffer payload, Runnable onPayloadTooLarge) { frameStart = frameBuffer.position(); + try { + final int performativeSize = writePerformative(frameBody, payload, onPayloadTooLarge); + final int capacity = maxFrameSize > 0 ? maxFrameSize - performativeSize : Integer.MAX_VALUE; + final int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), capacity); - final int performativeSize = writePerformative(frameBody, payload, onPayloadTooLarge); - final int capacity = maxFrameSize > 0 ? maxFrameSize - performativeSize : Integer.MAX_VALUE; - final int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), capacity); - - if (transport.isFrameTracingEnabled()) { - logFrame(channel, frameBody, payload, payloadSize); - } + if (transport.isFrameTracingEnabled()) { + logFrame(channel, frameBody, payload, payloadSize); + } - if (payloadSize > 0) { - int oldLimit = payload.limit(); - payload.limit(payload.position() + payloadSize); - frameBuffer.put(payload); - payload.limit(oldLimit); - } + if (payloadSize > 0) { + int oldLimit = payload.limit(); + payload.limit(payload.position() + payloadSize); + frameBuffer.put(payload); + payload.limit(oldLimit); + } - endFrame(channel); + endFrame(channel); - framesOutput++; + framesOutput++; + } catch (Exception e) { + frameBuffer.position(frameStart); + throw e; + } } private int writePerformative(Object frameBody, ReadableBuffer payload, Runnable onPayloadTooLarge) { diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameWriterTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameWriterTest.java index dd93304..f0ac2f1 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameWriterTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameWriterTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.spy; @@ -29,14 +30,19 @@ import static org.mockito.Mockito.verify; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.security.SaslFrameBody; import org.apache.qpid.proton.amqp.security.SaslInit; +import org.apache.qpid.proton.amqp.transport.Open; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.Transfer; import org.apache.qpid.proton.codec.AMQPDefinedTypes; @@ -107,6 +113,38 @@ public class FrameWriterTest { } @Test + public void testFailToWriteFrame() { + FrameWriter framer = new FrameWriter(encoder, Integer.MAX_VALUE, (byte) 1, transport); + + final class FailOnUnknownType implements Function<Boolean, Boolean> { + @Override + public Boolean apply(Boolean t) { + throw new IllegalStateException(); + } + }; + + Open open = new Open(); + Map<Symbol, Object> invalidProperties = new HashMap<>(); + invalidProperties.put(Symbol.valueOf("invalid-unknown-type"), new FailOnUnknownType()); + open.setProperties(invalidProperties); + + try { + framer.writeFrame(0, open, null, null); + fail("should have thrown exception"); + } catch (IllegalArgumentException e) { + // Expected + assertNotNull(e.getMessage()); + assertTrue(e.getMessage().contains(FailOnUnknownType.class.getName())); + } + + ByteBuffer destBuffer = ByteBuffer.allocate(16); + int read = framer.readBytes(destBuffer); + + assertEquals("should not have been any output read", 0, read); + assertEquals("should not have been any output in buffer", 0, destBuffer.position()); + } + + @Test public void testFrameWrittenToBufferWithLargePayloadAndMaxFrameSizeInvokesHandlerOnce() { Transfer transfer = createTransfer(); FrameWriter framer = new FrameWriter(encoder, 2048, (byte) 0, transport); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org