asfgit closed pull request #51: WAGON-537 Maven transfer speed of large 
artifacts is slow
URL: https://github.com/apache/maven-wagon/pull/51
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/wagon-provider-api/src/main/java/org/apache/maven/wagon/AbstractWagon.java 
b/wagon-provider-api/src/main/java/org/apache/maven/wagon/AbstractWagon.java
index 4cbf37d7..361390a4 100644
--- a/wagon-provider-api/src/main/java/org/apache/maven/wagon/AbstractWagon.java
+++ b/wagon-provider-api/src/main/java/org/apache/maven/wagon/AbstractWagon.java
@@ -42,8 +42,14 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
 import java.util.List;
 
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+
 /**
  * Implementation of common facilities for Wagon providers.
  *
@@ -53,6 +59,24 @@
     implements Wagon
 {
     protected static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
+    protected static final int MAXIMUM_BUFFER_SIZE = 1024 * 512;
+
+    /**
+     * To efficiently buffer data, use a multiple of 4k
+     * as this is likely to match the hardware buffer size of certain
+     * storage devices.
+     */
+    protected static final int BUFFER_SEGMENT_SIZE = 4 * 1024;
+
+    /**
+     * The desired minimum amount of chunks in which a {@link Resource} shall 
be
+     * {@link #transfer(Resource, InputStream, OutputStream, int, long) 
transferred}.
+     * This corresponds to the minimum times {@link 
#fireTransferProgress(TransferEvent, byte[], int)}.
+     * 100 notifications is a conservative value that will lead to small 
chunks for
+     * any artifact less that {@link #BUFFER_SEGMENT_SIZE} * {@link 
#MINIMUM_AMOUNT_OF_TRANSFER_CHUNKS}
+     * in size.
+     */
+    protected static final int MINIMUM_AMOUNT_OF_TRANSFER_CHUNKS = 100;
 
     protected Repository repository;
 
@@ -560,31 +584,74 @@ protected void transfer( Resource resource, InputStream 
input, OutputStream outp
     protected void transfer( Resource resource, InputStream input, 
OutputStream output, int requestType, long maxSize )
         throws IOException
     {
-        byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
+
+        ByteBuffer buffer = ByteBuffer.allocate( 
getBufferCapacityForTransferring( resource.getContentLength() ) );
+        int halfBufferCapacity = buffer.capacity() / 2;
 
         TransferEvent transferEvent = new TransferEvent( this, resource, 
TransferEvent.TRANSFER_PROGRESS, requestType );
         transferEvent.setTimestamp( System.currentTimeMillis() );
 
+        ReadableByteChannel in = Channels.newChannel( input );
+
         long remaining = maxSize;
         while ( remaining > 0 )
         {
-            // let's safely cast to int because the min value will be lower 
than the buffer size.
-            int n = input.read( buffer, 0, (int) Math.min( buffer.length, 
remaining ) );
+            int read = in.read( buffer );
 
-            if ( n == -1 )
+            if ( read == -1 )
             {
+                // EOF, but some data has not been written yet.
+                if ( buffer.position() != 0 )
+                {
+                    buffer.flip();
+                    fireTransferProgress( transferEvent, buffer.array(), 
buffer.limit() );
+                    output.write( buffer.array(), 0, buffer.limit() );
+                }
+
                 break;
             }
 
-            fireTransferProgress( transferEvent, buffer, n );
-
-            output.write( buffer, 0, n );
+            // Prevent minichunking / fragmentation: when less than half the 
buffer is utilized,
+            // read some more bytes before writing and firing progress.
+            if ( buffer.position() < halfBufferCapacity  )
+            {
+                continue;
+            }
 
-            remaining -= n;
+            buffer.flip();
+            fireTransferProgress( transferEvent, buffer.array(), 
buffer.limit() );
+            output.write( buffer.array(), 0, buffer.limit() );
+            remaining -= buffer.limit();
+            buffer.clear();
         }
         output.flush();
     }
 
+    /**
+     * Provides a buffer size for efficiently transferring the given amount of 
bytes, such that
+     * it is not fragmented into to many chunks. For larger files, larger 
buffers are provided such that downstream
+     * {@link #fireTransferProgress(TransferEvent, byte[], int) listeners} are 
not notified overly frequently.
+     * For instance, transferring gigabyte-sized resources would result in 
millions of notifications when using
+     * only a few kilobytes of buffer, drastically slowing transfer since 
transfer progress listeners and
+     * notifications are synchronous and may block, e.g. when writing download 
progress status to console.
+     *
+     * @param numberOfBytes can be 0 or less, in which case a default buffer 
size is used.
+     * @return a byte buffer suitable for transferring the given amount of 
bytes without too many chunks.
+     */
+    protected int getBufferCapacityForTransferring(long numberOfBytes )
+    {
+        if ( numberOfBytes <= 0 )
+        {
+            return DEFAULT_BUFFER_SIZE;
+        }
+
+        final int numberOfBufferSegments = ( ( int ) (
+            numberOfBytes / ( BUFFER_SEGMENT_SIZE * 
MINIMUM_AMOUNT_OF_TRANSFER_CHUNKS ) )
+        );
+        final int potentialBufferSize = numberOfBufferSegments * 
BUFFER_SEGMENT_SIZE;
+        return min( MAXIMUM_BUFFER_SIZE, max( DEFAULT_BUFFER_SIZE, 
potentialBufferSize ) );
+    }
+
     // ----------------------------------------------------------------------
     //
     // ----------------------------------------------------------------------
diff --git 
a/wagon-providers/wagon-http-shared/src/main/java/org/apache/maven/wagon/shared/http/AbstractHttpClientWagon.java
 
b/wagon-providers/wagon-http-shared/src/main/java/org/apache/maven/wagon/shared/http/AbstractHttpClientWagon.java
index 9f294f7e..d87cc857 100755
--- 
a/wagon-providers/wagon-http-shared/src/main/java/org/apache/maven/wagon/shared/http/AbstractHttpClientWagon.java
+++ 
b/wagon-providers/wagon-http-shared/src/main/java/org/apache/maven/wagon/shared/http/AbstractHttpClientWagon.java
@@ -84,6 +84,10 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -105,9 +109,6 @@
     private final class RequestEntityImplementation
         extends AbstractHttpEntity
     {
-
-        private static final int BUFFER_SIZE = 2048;
-
         private final Resource resource;
 
         private final Wagon wagon;
@@ -170,42 +171,47 @@ public void writeTo( final OutputStream outputStream )
             TransferEvent transferEvent =
                 new TransferEvent( wagon, resource, 
TransferEvent.TRANSFER_PROGRESS, TransferEvent.REQUEST_PUT );
             transferEvent.setTimestamp( System.currentTimeMillis() );
-            InputStream instream = ( this.source != null )
-                ? new FileInputStream( this.source )
-                : stream;
-            try
+
+            try ( ReadableByteChannel input = ( this.source != null )
+                    ? new RandomAccessFile( this.source, "r" ).getChannel()
+                    : Channels.newChannel( stream ) )
             {
-                byte[] buffer = new byte[BUFFER_SIZE];
-                int l;
-                if ( this.length < 0 )
-                {
-                    // until EOF
-                    while ( ( l = instream.read( buffer ) ) != -1 )
-                    {
-                        fireTransferProgress( transferEvent, buffer, -1 );
-                        outputStream.write( buffer, 0, l );
-                    }
-                }
-                else
+                ByteBuffer buffer = ByteBuffer.allocate( 
getBufferCapacityForTransferring( this.length ) );
+                int halfBufferCapacity = buffer.capacity() / 2;
+
+                long remaining = this.length < 0 ? Long.MAX_VALUE : 
this.length;
+                while ( remaining > 0 )
                 {
-                    // no need to consume more than length
-                    long remaining = this.length;
-                    while ( remaining > 0 )
+                    int read = input.read( buffer );
+                    if ( read == -1 )
                     {
-                        l = instream.read( buffer, 0, (int) Math.min( 
BUFFER_SIZE, remaining ) );
-                        if ( l == -1 )
+                        // EOF, but some data has not been written yet.
+                        if ( buffer.position() != 0 )
                         {
-                            break;
+                            buffer.flip();
+                            fireTransferProgress( transferEvent, 
buffer.array(), buffer.limit() );
+                            outputStream.write( buffer.array(), 0, 
buffer.limit() );
+                            buffer.clear();
                         }
-                        fireTransferProgress( transferEvent, buffer, (int) 
Math.min( BUFFER_SIZE, remaining ) );
-                        outputStream.write( buffer, 0, l );
-                        remaining -= l;
+
+                        break;
                     }
+
+                    // Prevent minichunking / fragmentation: when less than 
half the buffer is utilized,
+                    // read some more bytes before writing and firing progress.
+                    if ( buffer.position() < halfBufferCapacity )
+                    {
+                        continue;
+                    }
+
+                    buffer.flip();
+                    fireTransferProgress( transferEvent, buffer.array(), 
buffer.limit() );
+                    outputStream.write( buffer.array(), 0, buffer.limit() );
+                    remaining -= buffer.limit();
+                    buffer.clear();
+
                 }
-            }
-            finally
-            {
-                instream.close();
+                outputStream.flush();
             }
         }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to