This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ea264f3c37eb1623dcb02aa6b052d29872688b8e Author: Matthieu Baechler <[email protected]> AuthorDate: Wed Jan 15 18:34:28 2020 +0100 JAMES-3030 make Reactor toInputStream 50 times faster From 500 MiB/s to 10 GiB/s on my workstation It's due to the implementation of read(byte[], int off, int len) and some minor optimizations --- .../java/org/apache/james/util/ReactorUtils.java | 77 ++++++++++------------ .../org/apache/james/util/ReactorUtilsTest.java | 67 +++++++++++++++---- 2 files changed, 87 insertions(+), 57 deletions(-) diff --git a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java index df51e07..dd9ceba 100644 --- a/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java +++ b/server/container/util/src/main/java/org/apache/james/util/ReactorUtils.java @@ -21,9 +21,8 @@ package org.apache.james.util; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.Iterator; import java.util.Optional; -import java.util.Spliterator; -import java.util.stream.Stream; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -33,66 +32,56 @@ public class ReactorUtils { return Mono.fromRunnable(runnable).then(Mono.empty()); } + public static InputStream toInputStream(Flux<ByteBuffer> byteArrays) { - return new StreamInputStream(byteArrays.toStream(1)); + return new StreamInputStream(byteArrays.toIterable(1).iterator()); } - private static class StreamInputStream extends InputStream { + private static class StreamInputStream extends InputStream { private static final int NO_MORE_DATA = -1; - private final Stream<ByteBuffer> source; - private final Spliterator<ByteBuffer> spliterator; + private final Iterator<ByteBuffer> source; private Optional<ByteBuffer> currentItemByteStream; - StreamInputStream(Stream<ByteBuffer> source) { + StreamInputStream(Iterator<ByteBuffer> source) { this.source = source; - this.spliterator = source.spliterator(); this.currentItemByteStream = Optional.empty(); } @Override - public int read() { - try { - if (!dataAvailableToRead()) { - switchToNextChunk(); - } - - if (!dataAvailableToRead()) { - source.close(); - return NO_MORE_DATA; - } - - return currentItemByteStream - .filter(ByteBuffer::hasRemaining) - .map(buffer -> buffer.get() & 0xFF) - .orElseGet(this::readNextChunk); - } catch (Throwable t) { - source.close(); - throw t; - } + public int read(byte[] b, int off, int len) throws IOException { + return nextNonEmptyBuffer() + .map(buffer -> { + int toRead = Math.min(len, buffer.remaining()); + buffer.get(b, off, toRead); + return toRead; + }) + .orElse(NO_MORE_DATA); } - private boolean dataAvailableToRead() { - return currentItemByteStream.isPresent(); + @Override + public int read() { + return nextNonEmptyBuffer() + .map(ReactorUtils::byteToInt) + .orElse(NO_MORE_DATA); } - private void switchToNextChunk() { - spliterator.tryAdvance(bytes -> - currentItemByteStream = Optional.of(bytes)); + private Optional<ByteBuffer> nextNonEmptyBuffer() { + Boolean needsNewBuffer = currentItemByteStream.map(buffer -> !buffer.hasRemaining()).orElse(true); + if (needsNewBuffer) { + if (source.hasNext()) { + currentItemByteStream = Optional.of(source.next()); + return nextNonEmptyBuffer(); + } else { + return Optional.empty(); + } + } + return currentItemByteStream; } - private Integer readNextChunk() { - currentItemByteStream = Optional.empty(); - return read(); - } + } - @Override - public void close() throws IOException { - try { - source.close(); - } finally { - super.close(); - } - } + private static int byteToInt(ByteBuffer buffer) { + return buffer.get() & 0xff; } } diff --git a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java index 7bdc678..c01d09a 100644 --- a/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/ReactorUtilsTest.java @@ -20,18 +20,21 @@ package org.apache.james.util; import static org.assertj.core.api.Assertions.assertThat; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.time.Duration; +import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; +import com.google.common.primitives.Bytes; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; class ReactorUtilsTest { @@ -81,6 +84,37 @@ class ReactorUtilsTest { @Nested class ToInputStream { + + @Test + void givenAFluxOf3BytesShouldReadSuccessfullyTheWholeSource() throws IOException, InterruptedException { + byte[] bytes = "foo bar ...".getBytes(StandardCharsets.US_ASCII); + + Flux<ByteBuffer> source = Flux.fromIterable(Bytes.asList(bytes)) + .window(3) + .flatMapSequential(Flux::collectList) + .map(Bytes::toArray) + .map(ByteBuffer::wrap); + + InputStream inputStream = ReactorUtils.toInputStream(source); + + assertThat(inputStream).hasSameContentAs(new ByteArrayInputStream(bytes)); + } + + @Test + void givenALongFluxBytesShouldReadSuccessfullyTheWholeSource() throws IOException, InterruptedException { + byte[] bytes = RandomStringUtils.randomAlphabetic(41111).getBytes(StandardCharsets.US_ASCII); + + Flux<ByteBuffer> source = Flux.fromIterable(Bytes.asList(bytes)) + .window(3) + .flatMapSequential(Flux::collectList) + .map(Bytes::toArray) + .map(ByteBuffer::wrap); + + InputStream inputStream = ReactorUtils.toInputStream(source); + + assertThat(inputStream).hasSameContentAs(new ByteArrayInputStream(bytes)); + } + @Test void givenAFluxOnOneByteShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { AtomicInteger generateElements = new AtomicInteger(0); @@ -92,10 +126,10 @@ class ReactorUtilsTest { .map(ByteBuffer::wrap); InputStream inputStream = ReactorUtils.toInputStream(source); - byte[] readBytes = new byte[5]; - inputStream.read(readBytes, 0, readBytes.length); + byte[] readBytes = IOUtils.readFully(inputStream, 5); assertThat(readBytes).contains(0, 1, 2, 3, 4); + //make sure reactor is done with prefetch Thread.sleep(200); assertThat(generateElements.get()).isEqualTo(6); } @@ -103,17 +137,20 @@ class ReactorUtilsTest { @Test void givenAFluxOf3BytesShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { AtomicInteger generateElements = new AtomicInteger(0); - Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}) + Flux<ByteBuffer> source = Flux.just( + new byte[] {0, 1, 2}, + new byte[] {3, 4, 5}, + new byte[] {6, 7, 8}) .subscribeOn(Schedulers.elastic()) .map(ByteBuffer::wrap) .limitRate(2) .doOnRequest(request -> generateElements.getAndAdd((int) request)); InputStream inputStream = ReactorUtils.toInputStream(source); - byte[] readBytes = new byte[5]; - inputStream.read(readBytes, 0, readBytes.length); + byte[] readBytes = IOUtils.readFully(inputStream, 5); assertThat(readBytes).contains(0, 1, 2, 3, 4); + //make sure reactor is done with prefetch Thread.sleep(200); assertThat(generateElements.get()).isEqualTo(3); } @@ -121,19 +158,22 @@ class ReactorUtilsTest { @Test void givenAFluxOf3BytesWithAnEmptyByteArrayShouldConsumeOnlyTheReadBytesAndThePrefetch() throws IOException, InterruptedException { AtomicInteger generateElements = new AtomicInteger(0); - Flux<ByteBuffer> source = Flux.just(new byte[] {0, 1, 2}, new byte[] {}, new byte[] {3, 4, 5}, new byte[] {6, 7, 8}, new byte[] {9, 10, 11}) + Flux<ByteBuffer> source = Flux.just( + new byte[] {0, 1, 2}, + new byte[] {}, + new byte[] {3, 4, 5}, + new byte[] {6, 7, 8}, + new byte[] {9, 10, 11}) .subscribeOn(Schedulers.elastic()) .map(ByteBuffer::wrap) .limitRate(2) .doOnRequest(request -> generateElements.getAndAdd((int) request)); InputStream inputStream = ReactorUtils.toInputStream(source); - byte[] readBytes = new byte[5]; - inputStream.read(readBytes, 0, readBytes.length); + IOUtils.readFully(inputStream, 5); - assertThat(readBytes).contains(0, 1, 2, 3, 4); - Thread.sleep(200); - assertThat(generateElements.get()).isEqualTo(4); + byte[] readBytesBis = IOUtils.readFully(inputStream, 2); + assertThat(readBytesBis).contains(5,6); } @Test @@ -150,6 +190,7 @@ class ReactorUtilsTest { inputStream.read(readBytes, 0, readBytes.length); assertThat(readBytes).contains(0, 0, 0, 0, 0); + //make sure reactor is done with prefetch Thread.sleep(200); assertThat(generateElements.get()).isEqualTo(1); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
