This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ea264f3c37eb1623dcb02aa6b052d29872688b8e
Author: Matthieu Baechler <[email protected]>
AuthorDate: Wed Jan 15 18:34:28 2020 +0100

    JAMES-3030 make Reactor toInputStream 50 times faster
    
        From 500 MiB/s to 10 GiB/s on my workstation
    
        It's due to the implementation of read(byte[], int off, int len)
        and some minor optimizations
---
 .../java/org/apache/james/util/ReactorUtils.java   | 77 ++++++++++------------
 .../org/apache/james/util/ReactorUtilsTest.java    | 67 +++++++++++++++----
 2 files changed, 87 insertions(+), 57 deletions(-)

diff --git 
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java 
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
index df51e07..dd9ceba 100644
--- 
a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
+++ 
b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java
@@ -21,9 +21,8 @@ package org.apache.james.util;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.Iterator;
 import java.util.Optional;
-import java.util.Spliterator;
-import java.util.stream.Stream;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -33,66 +32,56 @@ public class ReactorUtils {
         return Mono.fromRunnable(runnable).then(Mono.empty());
     }
 
+
     public static InputStream toInputStream(Flux<ByteBuffer> byteArrays) {
-        return new StreamInputStream(byteArrays.toStream(1));
+        return new StreamInputStream(byteArrays.toIterable(1).iterator());
     }
 
-    private static  class StreamInputStream extends InputStream {
+    private static class StreamInputStream extends InputStream {
         private static final int NO_MORE_DATA = -1;
 
-        private final Stream<ByteBuffer> source;
-        private final Spliterator<ByteBuffer> spliterator;
+        private final Iterator<ByteBuffer> source;
         private Optional<ByteBuffer> currentItemByteStream;
 
-        StreamInputStream(Stream<ByteBuffer> source) {
+        StreamInputStream(Iterator<ByteBuffer> source) {
             this.source = source;
-            this.spliterator = source.spliterator();
             this.currentItemByteStream = Optional.empty();
         }
 
         @Override
-        public int read() {
-            try {
-                if (!dataAvailableToRead()) {
-                    switchToNextChunk();
-                }
-
-                if (!dataAvailableToRead()) {
-                    source.close();
-                    return NO_MORE_DATA;
-                }
-
-                return currentItemByteStream
-                    .filter(ByteBuffer::hasRemaining)
-                    .map(buffer -> buffer.get() & 0xFF)
-                    .orElseGet(this::readNextChunk);
-            } catch (Throwable t) {
-                source.close();
-                throw t;
-            }
+        public int read(byte[] b, int off, int len) throws IOException {
+            return nextNonEmptyBuffer()
+                .map(buffer -> {
+                    int toRead = Math.min(len, buffer.remaining());
+                    buffer.get(b, off, toRead);
+                    return toRead;
+                })
+                .orElse(NO_MORE_DATA);
         }
 
-        private boolean dataAvailableToRead() {
-            return currentItemByteStream.isPresent();
+        @Override
+        public int read() {
+            return nextNonEmptyBuffer()
+                .map(ReactorUtils::byteToInt)
+                .orElse(NO_MORE_DATA);
         }
 
-        private void switchToNextChunk() {
-            spliterator.tryAdvance(bytes ->
-                currentItemByteStream = Optional.of(bytes));
+        private Optional<ByteBuffer> nextNonEmptyBuffer() {
+            Boolean needsNewBuffer = currentItemByteStream.map(buffer -> 
!buffer.hasRemaining()).orElse(true);
+            if (needsNewBuffer) {
+                if (source.hasNext()) {
+                    currentItemByteStream = Optional.of(source.next());
+                    return nextNonEmptyBuffer();
+                } else {
+                    return Optional.empty();
+                }
+            }
+            return currentItemByteStream;
         }
 
-        private Integer readNextChunk() {
-            currentItemByteStream = Optional.empty();
-            return read();
-        }
+    }
 
-        @Override
-        public void close() throws IOException {
-            try {
-                source.close();
-            } finally {
-                super.close();
-            }
-        }
+    private static int byteToInt(ByteBuffer buffer) {
+        return buffer.get() & 0xff;
     }
 }
diff --git 
a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
 
b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
index 7bdc678..c01d09a 100644
--- 
a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
+++ 
b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java
@@ -20,18 +20,21 @@ package org.apache.james.util;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.time.Duration;
+import java.nio.charset.StandardCharsets;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 
+import com.google.common.primitives.Bytes;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Scheduler;
 import reactor.core.scheduler.Schedulers;
 
 class ReactorUtilsTest {
@@ -81,6 +84,37 @@ class ReactorUtilsTest {
 
     @Nested
     class ToInputStream {
+
+        @Test
+        void givenAFluxOf3BytesShouldReadSuccessfullyTheWholeSource() throws 
IOException, InterruptedException {
+            byte[] bytes = "foo bar ...".getBytes(StandardCharsets.US_ASCII);
+
+            Flux<ByteBuffer> source = Flux.fromIterable(Bytes.asList(bytes))
+                .window(3)
+                .flatMapSequential(Flux::collectList)
+                .map(Bytes::toArray)
+                .map(ByteBuffer::wrap);
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+
+            assertThat(inputStream).hasSameContentAs(new 
ByteArrayInputStream(bytes));
+        }
+
+        @Test
+        void givenALongFluxBytesShouldReadSuccessfullyTheWholeSource() throws 
IOException, InterruptedException {
+            byte[] bytes = 
RandomStringUtils.randomAlphabetic(41111).getBytes(StandardCharsets.US_ASCII);
+
+            Flux<ByteBuffer> source = Flux.fromIterable(Bytes.asList(bytes))
+                .window(3)
+                .flatMapSequential(Flux::collectList)
+                .map(Bytes::toArray)
+                .map(ByteBuffer::wrap);
+
+            InputStream inputStream = ReactorUtils.toInputStream(source);
+
+            assertThat(inputStream).hasSameContentAs(new 
ByteArrayInputStream(bytes));
+        }
+
         @Test
         void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() 
throws IOException, InterruptedException {
             AtomicInteger generateElements = new AtomicInteger(0);
@@ -92,10 +126,10 @@ class ReactorUtilsTest {
                 .map(ByteBuffer::wrap);
 
             InputStream inputStream = ReactorUtils.toInputStream(source);
-            byte[] readBytes = new byte[5];
-            inputStream.read(readBytes, 0, readBytes.length);
+            byte[] readBytes = IOUtils.readFully(inputStream, 5);
 
             assertThat(readBytes).contains(0, 1, 2, 3, 4);
+            //make sure reactor is done with prefetch
             Thread.sleep(200);
             assertThat(generateElements.get()).isEqualTo(6);
         }
@@ -103,17 +137,20 @@ class ReactorUtilsTest {
         @Test
         void givenAFluxOf3BytesShouldConsumeOnlyTheReadBytesAndThePrefetch() 
throws IOException, InterruptedException {
             AtomicInteger generateElements = new AtomicInteger(0);
-            Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new 
byte[] {3, 4, 5}, new byte[] {6, 7, 8})
+            Flux<ByteBuffer> source = Flux.just(
+                new byte[] {0, 1, 2},
+                new byte[] {3, 4, 5},
+                new byte[] {6, 7, 8})
                     .subscribeOn(Schedulers.elastic())
                     .map(ByteBuffer::wrap)
                     .limitRate(2)
                     .doOnRequest(request -> generateElements.getAndAdd((int) 
request));
 
             InputStream inputStream = ReactorUtils.toInputStream(source);
-            byte[] readBytes = new byte[5];
-            inputStream.read(readBytes, 0, readBytes.length);
+            byte[] readBytes = IOUtils.readFully(inputStream, 5);
 
             assertThat(readBytes).contains(0, 1, 2, 3, 4);
+            //make sure reactor is done with prefetch
             Thread.sleep(200);
             assertThat(generateElements.get()).isEqualTo(3);
         }
@@ -121,19 +158,22 @@ class ReactorUtilsTest {
         @Test
         void 
givenAFluxOf3BytesWithAnEmptyByteArrayShouldConsumeOnlyTheReadBytesAndThePrefetch()
 throws IOException, InterruptedException {
             AtomicInteger generateElements = new AtomicInteger(0);
-            Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new 
byte[] {}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11})
+            Flux<ByteBuffer> source = Flux.just(
+                new byte[] {0, 1, 2},
+                new byte[] {},
+                new byte[] {3, 4, 5},
+                new byte[] {6, 7, 8},
+                new byte[] {9, 10, 11})
                     .subscribeOn(Schedulers.elastic())
                     .map(ByteBuffer::wrap)
                     .limitRate(2)
                     .doOnRequest(request -> generateElements.getAndAdd((int) 
request));
 
             InputStream inputStream = ReactorUtils.toInputStream(source);
-            byte[] readBytes = new byte[5];
-            inputStream.read(readBytes, 0, readBytes.length);
+            IOUtils.readFully(inputStream, 5);
 
-            assertThat(readBytes).contains(0, 1, 2, 3, 4);
-            Thread.sleep(200);
-            assertThat(generateElements.get()).isEqualTo(4);
+            byte[] readBytesBis = IOUtils.readFully(inputStream, 2);
+            assertThat(readBytesBis).contains(5,6);
         }
 
         @Test
@@ -150,6 +190,7 @@ class ReactorUtilsTest {
             inputStream.read(readBytes, 0, readBytes.length);
 
             assertThat(readBytes).contains(0, 0, 0, 0, 0);
+            //make sure reactor is done with prefetch
             Thread.sleep(200);
             assertThat(generateElements.get()).isEqualTo(1);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to