This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3066696d675258f3970f85f15b36f64f6aefbd34
Author: Benoit Tellier <btell...@linagora.com>
AuthorDate: Tue Apr 21 17:28:21 2020 +0700

    JAMES-3140 Correct usage of push back input stream
---
 .../james/blob/cassandra/cache/CacheBlobStore.java | 67 ++++++++++++----------
 .../cassandra/cache/BlobStoreCacheContract.java    | 12 ++++
 .../blob/cassandra/cache/CacheBlobStoreTest.java   | 42 +++++++++++++-
 3 files changed, 88 insertions(+), 33 deletions(-)

diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
index 227ddca..fdddc45 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
@@ -24,11 +24,12 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PushbackInputStream;
+import java.util.Arrays;
+import java.util.Optional;
 
 import javax.inject.Inject;
 import javax.inject.Named;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.blob.api.BucketName;
@@ -67,8 +68,7 @@ public class CacheBlobStore implements BlobStore {
             .filter(defaultBucket::equals)
             .flatMap(ignored ->
                 Mono.from(cache.read(blobId))
-                    .<InputStream>flatMap(bytes -> Mono.fromCallable(() -> new 
ByteArrayInputStream(bytes)))
-            )
+                    .<InputStream>flatMap(bytes -> Mono.fromCallable(() -> new 
ByteArrayInputStream(bytes))))
             .switchIfEmpty(Mono.fromCallable(() -> backend.read(bucketName, 
blobId)))
             .blockOptional()
             .orElseThrow(() -> new 
ObjectNotFoundException(String.format("Could not retrieve blob metadata for 
%s", blobId)));
@@ -97,18 +97,23 @@ public class CacheBlobStore implements BlobStore {
     public Publisher<BlobId> save(BucketName bucketName, InputStream 
inputStream, StoragePolicy storagePolicy) {
         Preconditions.checkNotNull(inputStream, "InputStream must not be 
null");
 
+        if (isAbleToCache(bucketName, storagePolicy)) {
+            return saveInCache(bucketName, inputStream, storagePolicy);
+        }
+
+        return backend.save(bucketName, inputStream, storagePolicy);
+    }
+
+    private Publisher<BlobId> saveInCache(BucketName bucketName, InputStream 
inputStream, StoragePolicy storagePolicy) {
         PushbackInputStream pushbackInputStream = new 
PushbackInputStream(inputStream, sizeThresholdInBytes + 1);
-        return Mono.from(backend.save(bucketName, pushbackInputStream, 
storagePolicy))
-            .flatMap(blobId ->
-                Mono.fromCallable(() -> isALargeStream(pushbackInputStream))
-                    .flatMap(largeStream -> {
-                        if (!largeStream) {
-                            return Mono.from(saveInCache(bucketName, blobId, 
pushbackInputStream, storagePolicy))
-                                .thenReturn(blobId);
-                        }
-                        return Mono.just(blobId);
-                    })
-            );
+
+        return Mono.fromCallable(() -> 
fullyReadSmallStream(pushbackInputStream))
+            .flatMap(Mono::justOrEmpty)
+            .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy))
+            .flatMap(bytes -> Mono.from(backend.save(bucketName, 
pushbackInputStream, storagePolicy))
+                .flatMap(blobId -> Mono.from(cache.cache(blobId, bytes))
+                    .thenReturn(blobId)))
+            .switchIfEmpty(Mono.from(backend.save(bucketName, 
pushbackInputStream, storagePolicy)));
     }
 
     @Override
@@ -130,23 +135,23 @@ public class CacheBlobStore implements BlobStore {
         return Mono.from(backend.deleteBucket(bucketName));
     }
 
-    private Mono<Void> saveInCache(BucketName bucketName, BlobId blobId, 
PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) {
-        return Mono.fromCallable(() -> 
copyBytesFromStream(pushbackInputStream))
-            .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy))
-            .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes)));
-    }
-
-    private byte[] copyBytesFromStream(PushbackInputStream 
pushbackInputStream) throws IOException {
-        byte[] bytes = new byte[pushbackInputStream.available()];
-        int read = IOUtils.read(pushbackInputStream, bytes);
-        pushbackInputStream.unread(read);
-        return bytes;
-    }
-
-    private boolean isALargeStream(PushbackInputStream pushbackInputStream) 
throws IOException {
-        long skip = pushbackInputStream.skip(sizeThresholdInBytes + 1);
-        pushbackInputStream.unread(Math.toIntExact(skip));
-        return skip >= sizeThresholdInBytes;
+    private Optional<byte[]> fullyReadSmallStream(PushbackInputStream 
pushbackInputStream) throws IOException {
+        int sizeToRead = sizeThresholdInBytes + 1;
+        byte[] bytes = new byte[sizeToRead];
+        int readByteCount = pushbackInputStream.read(bytes, 0, sizeToRead);
+        try {
+            if (readByteCount > sizeThresholdInBytes) {
+                return Optional.empty();
+            }
+            if (readByteCount < 0) {
+                return Optional.of(new byte[] {});
+            }
+            return Optional.of(Arrays.copyOf(bytes, readByteCount));
+        } finally {
+            if (readByteCount > 0) {
+                pushbackInputStream.unread(bytes, 0, readByteCount);
+            }
+        }
     }
 
     /**
diff --git 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
index 6819aca..8d22d3f 100644
--- 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
+++ 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
@@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.awaitility.Awaitility.await;
 
+import java.io.ByteArrayInputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
 
@@ -107,4 +108,15 @@ public interface BlobStoreCacheContract {
         
await().atMost(Duration.TWO_SECONDS.plus(Duration.FIVE_HUNDRED_MILLISECONDS)).await().untilAsserted(()
             -> 
assertThat(Mono.from(testee().read(blobId)).blockOptional()).isEmpty());
     }
+
+    @Test
+    default void readShouldReturnEmptyCachedByteArray() {
+        BlobId blobId = blobIdFactory().randomId();
+        byte[] emptyByteArray = new byte[] {};
+
+        Mono.from(testee().cache(blobId, emptyByteArray)).block();
+
+        assertThat(new 
ByteArrayInputStream(Mono.from(testee().read(blobId)).block()))
+            .hasSameContentAs(new ByteArrayInputStream(emptyByteArray));
+    }
 }
diff --git 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
index 1c5ff09..a7808ab 100644
--- 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
+++ 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
@@ -28,6 +28,7 @@ import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.io.ByteArrayInputStream;
+import java.nio.charset.StandardCharsets;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
@@ -45,12 +46,15 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import com.google.common.base.Strings;
+
 import reactor.core.publisher.Mono;
 
 public class CacheBlobStoreTest implements BlobStoreContract {
 
     private static final BucketName DEFAULT_BUCKERNAME = DEFAULT;
     private static final BucketName TEST_BUCKERNAME = BucketName.of("test");
+    byte[] APPROXIMATELY_FIVE_KILOBYTES = Strings.repeat("0123456789\n", 
500).getBytes(StandardCharsets.UTF_8);
 
     @RegisterExtension
     static CassandraClusterExtension cassandraCluster = new 
CassandraClusterExtension(
@@ -139,16 +143,50 @@ public class CacheBlobStoreTest implements 
BlobStoreContract {
     }
 
     @Test
-    public void shouldNotCacheWhenEmptyStream() {
+    public void shouldCacheWhenEmptyStream() {
         BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, new 
ByteArrayInputStream(EMPTY_BYTEARRAY), SIZE_BASED)).block();
 
         SoftAssertions.assertSoftly(soflty -> {
-            
assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThat(new 
ByteArrayInputStream(Mono.from(cache.read(blobId)).block())).hasSameContentAs(new
 ByteArrayInputStream(EMPTY_BYTEARRAY));
             assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, 
blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
         });
     }
 
     @Test
+    public void shouldNotCacheWhenEmptyByteArray() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, 
EMPTY_BYTEARRAY, SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(new 
ByteArrayInputStream(Mono.from(cache.read(blobId)).block())).hasSameContentAs(new
 ByteArrayInputStream(EMPTY_BYTEARRAY));
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, 
blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
+        });
+    }
+
+    @Test
+    public void shouldCacheWhenFiveKilobytesSteam() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, new 
ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES), SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(new 
ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
+                .hasSameContentAs(new 
ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+            assertThat(new 
ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, 
blobId)).block()))
+                .hasSameContentAs(new 
ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+        });
+    }
+
+    @Test
+    public void shouldCacheWhenFiveKilobytesBytes() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, 
APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            assertThat(new 
ByteArrayInputStream(Mono.from(cache.read(blobId)).block()))
+                .hasSameContentAs(new 
ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+            assertThat(new 
ByteArrayInputStream(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, 
blobId)).block()))
+                .hasSameContentAs(new 
ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES));
+        });
+    }
+
+    @Test
     public void shouldRemoveBothInCacheAndBackendWhenDefaultBucketName() {
         BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, 
EIGHT_KILOBYTES, SIZE_BASED)).block();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to