[ 
https://issues.apache.org/jira/browse/ARTEMIS-3449?focusedWorklogId=643526&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-643526
 ]

ASF GitHub Bot logged work on ARTEMIS-3449:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 30/Aug/21 15:30
            Start Date: 30/Aug/21 15:30
    Worklog Time Spent: 10m 
      Work Description: franz1981 commented on a change in pull request #3711:
URL: https://github.com/apache/activemq-artemis/pull/3711#discussion_r698588296



##########
File path: 
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
##########
@@ -587,43 +587,48 @@ void deliver() {
          LargeBodyReader context = message.getLargeBodyReader();
          try {
             context.open();
+            final ByteBuf tmpFrameBuf = 
PooledByteBufAllocator.DEFAULT.directBuffer(frameSize);
+            final NettyReadable nettyReadable = new NettyReadable(tmpFrameBuf);
             try {
+
                context.position(position);
                long bodySize = context.getSize();
-
-               ByteBuffer buf = ByteBuffer.allocate(frameSize);
+               // materialize it so we can use its internal NIO buffer
+               tmpFrameBuf.ensureWritable(frameSize);
 
                for (; sender.getLocalState() != EndpointState.CLOSED && 
position < bodySize; ) {
                   if (!connection.flowControl(this::resume)) {
                      context.close();
                      return;
                   }
-                  buf.clear();
-                  int size = 0;
-
-                  try {
-                     if (position == 0) {
-                        replaceInitialHeader(deliveryAnnotationsToEncode, 
context, WritableBuffer.ByteBufferWrapper.wrap(buf));
-                     }
-                     size = context.readInto(buf);
-
-                     sender.send(new ReadableBuffer.ByteBufferReader(buf));
-                     position += size;
-                  } catch (java.nio.BufferOverflowException overflowException) 
{
-                     if (position == 0) {
-                        if (log.isDebugEnabled()) {
-                           log.debug("Delivery of message failed with an 
overFlowException, retrying again with expandable buffer");
-                        }
-                        // on the very first packet, if the initial header was 
replaced with a much bigger header (re-encoding)
-                        // we could recover the situation with a retry using 
an expandable buffer.
-                        // this is tested on 
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest
-                        size = 
retryInitialPacketWithExpandableBuffer(deliveryAnnotationsToEncode, context, 
buf);
-                     } else {
-                        // if this is not the position 0, something is going on
-                        // we just forward the exception as this is not 
supposed to happen
-                        throw overflowException;
+                  // using internalNioBuffer to save creating a new ByteBuffer 
duplicate/slice/view in the loop
+                  ByteBuffer nioBuffer = tmpFrameBuf.internalNioBuffer(0, 
frameSize);
+                  int bufPosition = nioBuffer.position();
+                  tmpFrameBuf.clear();
+                  final int writtenBytes;
+                  if (position == 0) {
+                     // no need to cache NettyWritable: position should be 0 
just once per large message file
+                     replaceInitialHeader(deliveryAnnotationsToEncode, 
context, new NettyWritable(tmpFrameBuf));
+                     writtenBytes = tmpFrameBuf.writerIndex();
+                     // tested on 
org.apache.activemq.artemis.tests.integration.amqp.AmqpMessageDivertsTest:
+                     // tmpFrameBuf can grow over the initial capacity
+                     if (nioBuffer.remaining() < writtenBytes) {
+                        // ensure reading at least frameSize from the file
+                        tmpFrameBuf.ensureWritable(frameSize);
+                        // refresh internal NIO buffer: the previous one is no 
longer valid
+                        nioBuffer = tmpFrameBuf.internalNioBuffer(0, 
writtenBytes + frameSize);
+                        bufPosition = nioBuffer.position();

Review comment:
       Let me see if I can simplify the code path here to make it simpler then, 
I see that looks too tricky and can be easily changed causing bugs




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 643526)
    Time Spent: 2.5h  (was: 2h 20m)

> Speedup AMQP large message streaming
> ------------------------------------
>
>                 Key: ARTEMIS-3449
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3449
>             Project: ActiveMQ Artemis
>          Issue Type: Improvement
>            Reporter: Francesco Nigro
>            Assignee: Francesco Nigro
>            Priority: Major
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> AMQP is using unpooled heap ByteBuffer(s) to stream AMQP large messages: 
> given that the underline NIO sequential file can both use FileChannel or 
> RandomAccessFile (depending if the ByteBuffer used is direct/heap based), 
> both approaches would benefit from using Netty pooled direct buffers and save 
> additional copies (performed by RandomAccessFile) to happen, reducing GC too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to