This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3066696d675258f3970f85f15b36f64f6aefbd34 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Tue Apr 21 17:28:21 2020 +0700 JAMES-3140 Correct usage of push back input stream --- .../james/blob/cassandra/cache/CacheBlobStore.java | 67 ++++++++++++---------- .../cassandra/cache/BlobStoreCacheContract.java | 12 ++++ .../blob/cassandra/cache/CacheBlobStoreTest.java | 42 +++++++++++++- 3 files changed, 88 insertions(+), 33 deletions(-) diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java index 227ddca..fdddc45 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java @@ -24,11 +24,12 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.PushbackInputStream; +import java.util.Arrays; +import java.util.Optional; import javax.inject.Inject; import javax.inject.Named; -import org.apache.commons.io.IOUtils; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStore; import org.apache.james.blob.api.BucketName; @@ -67,8 +68,7 @@ public class CacheBlobStore implements BlobStore { .filter(defaultBucket::equals) .flatMap(ignored -> Mono.from(cache.read(blobId)) - .<InputStream>flatMap(bytes -> Mono.fromCallable(() -> new ByteArrayInputStream(bytes))) - ) + .<InputStream>flatMap(bytes -> Mono.fromCallable(() -> new ByteArrayInputStream(bytes)))) .switchIfEmpty(Mono.fromCallable(() -> backend.read(bucketName, blobId))) .blockOptional() .orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId))); @@ -97,18 +97,23 @@ public class CacheBlobStore implements BlobStore { public Publisher<BlobId> save(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy) { Preconditions.checkNotNull(inputStream, "InputStream must not be null"); + if (isAbleToCache(bucketName, storagePolicy)) { + return saveInCache(bucketName, inputStream, storagePolicy); + } + + return backend.save(bucketName, inputStream, storagePolicy); + } + + private Publisher<BlobId> saveInCache(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy) { PushbackInputStream pushbackInputStream = new PushbackInputStream(inputStream, sizeThresholdInBytes + 1); - return Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy)) - .flatMap(blobId -> - Mono.fromCallable(() -> isALargeStream(pushbackInputStream)) - .flatMap(largeStream -> { - if (!largeStream) { - return Mono.from(saveInCache(bucketName, blobId, pushbackInputStream, storagePolicy)) - .thenReturn(blobId); - } - return Mono.just(blobId); - }) - ); + + return Mono.fromCallable(() -> fullyReadSmallStream(pushbackInputStream)) + .flatMap(Mono::justOrEmpty) + .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy)) + .flatMap(bytes -> Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy)) + .flatMap(blobId -> Mono.from(cache.cache(blobId, bytes)) + .thenReturn(blobId))) + .switchIfEmpty(Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy))); } @Override @@ -130,23 +135,23 @@ public class CacheBlobStore implements BlobStore { return Mono.from(backend.deleteBucket(bucketName)); } - private Mono<Void> saveInCache(BucketName bucketName, BlobId blobId, PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) { - return Mono.fromCallable(() -> copyBytesFromStream(pushbackInputStream)) - .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy)) - .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes))); - } - - private byte[] copyBytesFromStream(PushbackInputStream pushbackInputStream) throws IOException { - byte[] bytes = new byte[pushbackInputStream.available()]; - int read = IOUtils.read(pushbackInputStream, bytes); - pushbackInputStream.unread(read); - return bytes; - } - - private boolean isALargeStream(PushbackInputStream pushbackInputStream) throws IOException { - long skip = pushbackInputStream.skip(sizeThresholdInBytes + 1); - pushbackInputStream.unread(Math.toIntExact(skip)); - return skip >= sizeThresholdInBytes; + private Optional<byte[]> fullyReadSmallStream(PushbackInputStream pushbackInputStream) throws IOException { + int sizeToRead = sizeThresholdInBytes + 1; + byte[] bytes = new byte[sizeToRead]; + int readByteCount = pushbackInputStream.read(bytes, 0, sizeToRead); + try { + if (readByteCount > sizeThresholdInBytes) { + return Optional.empty(); + } + if (readByteCount < 0) { + return Optional.of(new byte[] {}); + } + return Optional.of(Arrays.copyOf(bytes, readByteCount)); + } finally { + if (readByteCount > 0) { + pushbackInputStream.unread(bytes, 0, readByteCount); + } + } } /** diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java index 6819aca..8d22d3f 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java @@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.awaitility.Awaitility.await; +import java.io.ByteArrayInputStream; import java.nio.charset.StandardCharsets; import java.util.Optional; @@ -107,4 +108,15 @@ public interface BlobStoreCacheContract { await().atMost(Duration.TWO_SECONDS.plus(Duration.FIVE_HUNDRED_MILLISECONDS)).await().untilAsserted(() -> assertThat(Mono.from(testee().read(blobId)).blockOptional()).isEmpty()); } + + @Test + default void readShouldReturnEmptyCachedByteArray() { + BlobId blobId = blobIdFactory().randomId(); + byte[] emptyByteArray = new byte[] {}; + + Mono.from(testee().cache(blobId, emptyByteArray)).block(); + + assertThat(new ByteArrayInputStream(Mono.from(testee().read(blobId)).block())) + .hasSameContentAs(new ByteArrayInputStream(emptyByteArray)); + } } diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java index 1c5ff09..a7808ab 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java @@ -28,6 +28,7 @@ import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; @@ -45,12 +46,15 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import com.google.common.base.Strings; + import reactor.core.publisher.Mono; public class CacheBlobStoreTest implements BlobStoreContract { private static final BucketName DEFAULT_BUCKERNAME = DEFAULT; private static final BucketName TEST_BUCKERNAME = BucketName.of("test"); + byte[] APPROXIMATELY_FIVE_KILOBYTES = Strings.repeat("0123456789\n", 500).getBytes(StandardCharsets.UTF_8); @RegisterExtension static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension( @@ -139,16 +143,50 @@ public class CacheBlobStoreTest implements BlobStoreContract { } @Test - public void shouldNotCacheWhenEmptyStream() { + public void shouldCacheWhenEmptyStream() { BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, new ByteArrayInputStream(EMPTY_BYTEARRAY), SIZE_BASED)).block(); SoftAssertions.assertSoftly(soflty -> { - assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty(); + assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(EMPTY_BYTEARRAY)); assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY); }); } @Test + public void shouldNotCacheWhenEmptyByteArray() { + BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EMPTY_BYTEARRAY, SIZE_BASED)).block(); + + SoftAssertions.assertSoftly(soflty -> { + assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block())).hasSameContentAs(new ByteArrayInputStream(EMPTY_BYTEARRAY)); + assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY); + }); + } + + @Test + public void shouldCacheWhenFiveKilobytesSteam() { + BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES), SIZE_BASED)).block(); + + SoftAssertions.assertSoftly(soflty -> { + assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block())) + .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES)); + assertThat(new ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block())) + .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES)); + }); + } + + @Test + public void shouldCacheWhenFiveKilobytesBytes() { + BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block(); + + SoftAssertions.assertSoftly(soflty -> { + assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block())) + .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES)); + assertThat(new ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block())) + .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES)); + }); + } + + @Test public void shouldRemoveBothInCacheAndBackendWhenDefaultBucketName() { BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block(); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org