This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 3c4818289ca8777528284ad9470322538b4f2e95
Author: ducnv <duc91....@gmail.com>
AuthorDate: Tue Apr 14 17:38:58 2020 +0700

    JAMES-3140: Implement CacheBlobStoreTest
---
 .../blob/api/BucketDumbBlobStoreContract.java      |   6 +-
 .../apache/james/blob/cassandra/BlobTables.java    |   2 +-
 .../james/blob/cassandra/CassandraBlobModule.java  |  14 --
 .../blob/cassandra/CassandraDumbBlobStore.java     |  10 +-
 ...DumbBlobStoreCache.java => BlobStoreCache.java} |   2 +-
 .../james/blob/cassandra/cache/CacheBlobStore.java | 163 +++++++++++++++++++++
 .../blob/cassandra/cache/CachedDumbBlobStore.java  |  80 ----------
 ...heModule.java => CassandraBlobCacheModule.java} |  12 +-
 ...toreCache.java => CassandraBlobStoreCache.java} |  10 +-
 ...heContract.java => BlobStoreCacheContract.java} |   5 +-
 .../blob/cassandra/cache/CacheBlobStoreTest.java   | 163 +++++++++++++++++++++
 ...eTest.java => CassandraBlobStoreCacheTest.java} |  10 +-
 12 files changed, 357 insertions(+), 120 deletions(-)

diff --git 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
index 54d2384..8695118 100644
--- 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
+++ 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
@@ -54,7 +54,7 @@ public interface BucketDumbBlobStoreContract {
     default void deleteBucketShouldDeleteExistingBucketWithItsData() {
         DumbBlobStore store = testee();
 
-        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ELEVEN_KILOBYTES)).block();
+        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
SHORT_BYTEARRAY)).block();
         Mono.from(store.deleteBucket(TEST_BUCKET_NAME)).block();
 
         assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, 
TEST_BLOB_ID).read())
@@ -118,7 +118,7 @@ public interface BucketDumbBlobStoreContract {
     default void readStreamShouldThrowWhenBucketDoesNotExist() {
         DumbBlobStore store = testee();
 
-        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ELEVEN_KILOBYTES)).block();
+        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
SHORT_BYTEARRAY)).block();
         assertThatThrownBy(() -> store.read(CUSTOM_BUCKET_NAME, 
TEST_BLOB_ID).read())
             .isInstanceOf(ObjectNotFoundException.class);
     }
@@ -127,7 +127,7 @@ public interface BucketDumbBlobStoreContract {
     default void readBytesShouldThrowWhenBucketDoesNotExistWithBigData() {
         DumbBlobStore store = testee();
 
-        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ELEVEN_KILOBYTES)).block();
+        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
SHORT_BYTEARRAY)).block();
 
         assertThatThrownBy(() -> Mono.from(store.readBytes(CUSTOM_BUCKET_NAME, 
TEST_BLOB_ID)).block())
             .isInstanceOf(ObjectNotFoundException.class);
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java
index 92e6756..09a6823 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java
@@ -49,7 +49,7 @@ public interface BlobTables {
         String DATA = "data";
     }
 
-    interface DumbBlobCache {
+    interface BlobStoreCache {
         String TABLE_NAME = "blob_cache";
         String ID = "id";
         String DATA = "data";
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java
index 934812d..1975c1a 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java
@@ -19,7 +19,6 @@
 
 package org.apache.james.blob.cassandra;
 
-import static 
com.datastax.driver.core.schemabuilder.TableOptions.CompactionOptions.TimeWindowCompactionStrategyOptions.CompactionWindowUnit.HOURS;
 import static 
org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobParts.DATA;
 
 import org.apache.james.backends.cassandra.components.CassandraModule;
@@ -29,7 +28,6 @@ import 
org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobParts;
 import org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobTable;
 
 import com.datastax.driver.core.DataType;
-import com.datastax.driver.core.schemabuilder.SchemaBuilder;
 
 public interface CassandraBlobModule {
     CassandraModule MODULE = CassandraModule
@@ -67,17 +65,5 @@ public interface CassandraBlobModule {
             .addPartitionKey(BucketBlobParts.ID, DataType.text())
             .addClusteringColumn(BucketBlobTable.NUMBER_OF_CHUNK, 
DataType.cint()))
 
-        .table(BlobTables.DumbBlobCache.TABLE_NAME)
-        .options(options -> options
-            .compactionOptions(SchemaBuilder.timeWindowCompactionStrategy()
-                .compactionWindowSize(1)
-                .compactionWindowUnit(HOURS))
-            .readRepairChance(0.0))
-        .comment("Write through cache for small blobs stored in a slower blob 
store implementation which is object storage" +
-            "Messages` headers and bodies are stored as blobparts.")
-        .statement(statement -> statement
-            .addPartitionKey(BlobTables.DumbBlobCache.ID, DataType.text())
-            .addColumn(BlobTables.DumbBlobCache.DATA, DataType.blob()))
-
         .build();
 }
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
index 2e57325..240a514 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
@@ -21,6 +21,7 @@ package org.apache.james.blob.cassandra;
 
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.NoSuchElementException;
 
@@ -38,7 +39,9 @@ import org.apache.james.blob.cassandra.utils.DataChunker;
 import org.apache.james.util.ReactorUtils;
 
 import com.github.fge.lambdas.Throwing;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.io.ByteSource;
 
 import reactor.core.publisher.Flux;
@@ -56,10 +59,11 @@ public class CassandraDumbBlobStore implements 
DumbBlobStore {
     private final BucketName defaultBucket;
 
     @Inject
+    @VisibleForTesting
     public CassandraDumbBlobStore(CassandraDefaultBucketDAO defaultBucketDAO,
-                           CassandraBucketDAO bucketDAO,
-                           CassandraConfiguration cassandraConfiguration,
-                           @Named(DEFAULT_BUCKET) BucketName defaultBucket) {
+                                  CassandraBucketDAO bucketDAO,
+                                  CassandraConfiguration 
cassandraConfiguration,
+                                  @Named(DEFAULT_BUCKET) BucketName 
defaultBucket) {
         this.defaultBucketDAO = defaultBucketDAO;
         this.bucketDAO = bucketDAO;
         this.configuration = cassandraConfiguration;
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCache.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/BlobStoreCache.java
similarity index 97%
rename from 
server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCache.java
rename to 
server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/BlobStoreCache.java
index de60a33..9aec0b0 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCache.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/BlobStoreCache.java
@@ -21,7 +21,7 @@ package org.apache.james.blob.cassandra.cache;
 import org.apache.james.blob.api.BlobId;
 import org.reactivestreams.Publisher;
 
-public interface DumbBlobStoreCache {
+public interface BlobStoreCache {
     Publisher<Void> cache(BlobId blobId, byte[] data);
 
     Publisher<byte[]> read(BlobId blobId);
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
new file mode 100644
index 0000000..0acc95e
--- /dev/null
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java
@@ -0,0 +1,163 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.blob.cassandra.cache;
+
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PushbackInputStream;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.blob.api.ObjectStoreIOException;
+import org.reactivestreams.Publisher;
+
+import com.google.common.base.Preconditions;
+
+import reactor.core.publisher.Mono;
+
+public class CacheBlobStore implements BlobStore {
+
+    private static final String DEFAULT_BUCKET = "cassandraDefault";
+
+    private final BlobStoreCache cache;
+    private final BlobStore backend;
+    private final Integer sizeThresholdInBytes;
+    private final BucketName defaultBucket;
+
+    @Inject
+    public CacheBlobStore(BlobStoreCache cache, BlobStore backend,
+                          CassandraCacheConfiguration cacheConfiguration,
+                          @Named(DEFAULT_BUCKET) BucketName defaultBucket) {
+        this.cache = cache;
+        this.backend = backend;
+        this.sizeThresholdInBytes = 
cacheConfiguration.getSizeThresholdInBytes();
+        this.defaultBucket = defaultBucket;
+    }
+
+    @Override
+    public InputStream read(BucketName bucketName, BlobId blobId) throws 
ObjectStoreIOException, ObjectNotFoundException {
+        Preconditions.checkNotNull(bucketName, "bucketName should not be 
null");
+
+        return Mono.just(bucketName)
+            .filter(defaultBucket::equals)
+            .flatMap(ignored ->
+                Mono.from(cache.read(blobId))
+                    .<InputStream>flatMap(bytes -> Mono.fromCallable(() -> new 
ByteArrayInputStream(bytes)))
+            )
+            .switchIfEmpty(Mono.fromCallable(() -> backend.read(bucketName, 
blobId)))
+            .blockOptional()
+            .orElseThrow(() -> new 
ObjectNotFoundException(String.format("Could not retrieve blob metadata for 
%s", blobId)));
+    }
+
+    @Override
+    public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
+        return Mono.just(bucketName)
+            .filter(defaultBucket::equals)
+            .flatMap(ignored -> Mono.from(cache.read(blobId)))
+            .switchIfEmpty(Mono.from(backend.readBytes(bucketName, blobId)));
+    }
+
+    @Override
+    public Mono<BlobId> save(BucketName bucketName, byte[] bytes, 
StoragePolicy storagePolicy) {
+        return Mono.from(backend.save(bucketName, bytes, storagePolicy))
+            .flatMap(blobId -> {
+                if (isAbleToCache(bucketName, bytes, storagePolicy)) {
+                    return Mono.from(cache.cache(blobId, 
bytes)).thenReturn(blobId);
+                }
+                return Mono.just(blobId);
+            });
+    }
+
+    @Override
+    public Publisher<BlobId> save(BucketName bucketName, InputStream 
inputStream, StoragePolicy storagePolicy) {
+        Preconditions.checkNotNull(inputStream, "InputStream must not be 
null");
+
+        PushbackInputStream pushbackInputStream = new 
PushbackInputStream(inputStream, sizeThresholdInBytes + 1);
+        return Mono.from(backend.save(bucketName, pushbackInputStream, 
storagePolicy))
+            .flatMap(blobId ->
+                Mono.fromCallable(() -> isALargeStream(pushbackInputStream))
+                    .flatMap(largeStream -> {
+                        if (!largeStream) {
+                            return Mono.from(saveInCache(bucketName, blobId, 
pushbackInputStream, storagePolicy))
+                                .thenReturn(blobId);
+                        }
+                        return Mono.just(blobId);
+                    })
+            );
+    }
+
+    @Override
+    public BucketName getDefaultBucketName() {
+        return defaultBucket;
+    }
+
+    @Override
+    public Mono<Void> delete(BucketName bucketName, BlobId blobId) {
+        return Mono.from(backend.delete(bucketName, blobId))
+            .then(Mono.just(bucketName)
+                .filter(defaultBucket::equals)
+                .flatMap(ignored -> Mono.from(cache.remove(blobId)))
+                .then());
+    }
+
+    @Override
+    public Publisher<Void> deleteBucket(BucketName bucketName) {
+        return Mono.from(backend.deleteBucket(bucketName));
+    }
+
+    private Mono<Void> saveInCache(BucketName bucketName, BlobId blobId, 
PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) {
+        return Mono.fromCallable(() -> 
copyBytesFromStream(pushbackInputStream))
+            .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy))
+            .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes)));
+    }
+
+    private byte[] copyBytesFromStream(PushbackInputStream 
pushbackInputStream) throws IOException {
+        byte[] bytes = new byte[pushbackInputStream.available()];
+        int read = IOUtils.read(pushbackInputStream, bytes);
+        pushbackInputStream.unread(read);
+        return bytes;
+    }
+
+    private boolean isALargeStream(PushbackInputStream pushbackInputStream) 
throws IOException {
+        pushbackInputStream.mark(0);
+        long skip = pushbackInputStream.skip(sizeThresholdInBytes + 1);
+        pushbackInputStream.unread(Math.toIntExact(skip));
+        return skip >= sizeThresholdInBytes;
+    }
+
+    /**
+     * bytes: byte[] from PushbackInputStream.If PushbackInputStream is empty 
bytes.length == 1
+     */
+    private boolean isAbleToCache(BucketName bucketName, byte[] bytes, 
StoragePolicy storagePolicy) {
+        return isAbleToCache(bucketName, storagePolicy) && bytes.length <= 
sizeThresholdInBytes && bytes.length > 1;
+    }
+
+    private boolean isAbleToCache(BucketName bucketName, StoragePolicy 
storagePolicy) {
+        return defaultBucket.equals(bucketName) && 
!storagePolicy.equals(LOW_COST);
+    }
+}
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java
deleted file mode 100644
index 53f3df2..0000000
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package org.apache.james.blob.cassandra.cache;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-
-import javax.inject.Inject;
-
-import org.apache.james.blob.api.BlobId;
-import org.apache.james.blob.api.BucketName;
-import org.apache.james.blob.api.DumbBlobStore;
-import org.apache.james.blob.api.ObjectNotFoundException;
-import org.apache.james.blob.api.ObjectStoreIOException;
-import org.reactivestreams.Publisher;
-
-import com.github.fge.lambdas.Throwing;
-import com.google.common.base.Preconditions;
-import com.google.common.io.ByteSource;
-
-import reactor.core.publisher.Mono;
-
-public class CachedDumbBlobStore implements DumbBlobStore {
-
-    private final DumbBlobStoreCache cache;
-    private final DumbBlobStore backend;
-
-    @Inject
-    public CachedDumbBlobStore(DumbBlobStoreCache cache, DumbBlobStore 
backend) {
-        this.cache = cache;
-        this.backend = backend;
-    }
-
-    @Override
-    public InputStream read(BucketName bucketName, BlobId blobId) throws 
ObjectStoreIOException, ObjectNotFoundException {
-        Preconditions.checkNotNull(bucketName, "bucketName should not be 
null");
-
-        return Mono.from(cache.read(blobId))
-            .map(bytes -> (InputStream) new ByteArrayInputStream(bytes))
-            .switchIfEmpty(Mono.fromCallable(() -> backend.read(bucketName, 
blobId)))
-            .blockOptional()
-            .orElseThrow(() -> new 
ObjectNotFoundException(String.format("Could not retrieve blob metadata for 
%s", blobId)));
-    }
-
-    @Override
-    public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
-        return Mono.from(cache.read(blobId))
-            .switchIfEmpty(Mono.from(backend.readBytes(bucketName, blobId)));
-    }
-
-    @Override
-    public Publisher<Void> save(BucketName bucketName, BlobId blobId, byte[] 
data) {
-        return Mono.from(cache.cache(blobId, data))
-            .then(Mono.from(backend.save(bucketName, blobId, data)));
-    }
-
-    @Override
-    public Publisher<Void> save(BucketName bucketName, BlobId blobId, 
InputStream inputStream) {
-        return Mono.fromCallable(() -> inputStream)
-            .map(stream -> cache.cache(blobId, stream))
-            .then(Mono.from(backend.save(bucketName, blobId, inputStream)));
-    }
-
-    @Override
-    public Publisher<Void> save(BucketName bucketName, BlobId blobId, 
ByteSource content) {
-        return Mono.from(backend.save(bucketName, blobId, content))
-            .then(Mono.using(content::openBufferedStream,
-                inputStream -> Mono.from(cache.cache(blobId, inputStream)),
-                Throwing.consumer(InputStream::close).sneakyThrow()));
-    }
-
-    @Override
-    public Publisher<Void> delete(BucketName bucketName, BlobId blobId) {
-        return Mono.from(backend.delete(bucketName, blobId))
-            .then(Mono.from(cache.remove(blobId)));
-    }
-
-    @Override
-    public Publisher<Void> deleteBucket(BucketName bucketName) {
-        return Mono.from(backend.deleteBucket(bucketName));
-    }
-}
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobCacheModule.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobCacheModule.java
similarity index 83%
rename from 
server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobCacheModule.java
rename to 
server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobCacheModule.java
index 6d4734e..3f8fa12 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobCacheModule.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobCacheModule.java
@@ -20,20 +20,22 @@
 package org.apache.james.blob.cassandra.cache;
 
 import static 
com.datastax.driver.core.schemabuilder.TableOptions.CompactionOptions.TimeWindowCompactionStrategyOptions.CompactionWindowUnit.HOURS;
+import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.DATA;
+import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.ID;
+import static 
org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.TABLE_NAME;
 
 import org.apache.james.backends.cassandra.components.CassandraModule;
-import org.apache.james.blob.cassandra.BlobTables;
 
 import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.schemabuilder.SchemaBuilder;
 
-public interface CassandraDumbBlobCacheModule {
+public interface CassandraBlobCacheModule {
 
     double NO_READ_REPAIR = 0d;
 
     CassandraModule MODULE = CassandraModule
         .builder()
-        .table(BlobTables.DumbBlobCache.TABLE_NAME)
+        .table(TABLE_NAME)
         .options(options -> options
             .compactionOptions(SchemaBuilder.timeWindowCompactionStrategy()
                 .compactionWindowSize(1)
@@ -41,7 +43,7 @@ public interface CassandraDumbBlobCacheModule {
             .readRepairChance(NO_READ_REPAIR))
         .comment("Write through cache for small blobs stored in a slower blob 
store implementation.")
         .statement(statement -> statement
-            .addPartitionKey(BlobTables.DumbBlobCache.ID, DataType.text())
-            .addColumn(BlobTables.DumbBlobCache.DATA, DataType.blob()))
+            .addPartitionKey(ID, DataType.text())
+            .addColumn(DATA, DataType.blob()))
         .build();
 }
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCache.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCache.java
similarity index 92%
rename from 
server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCache.java
rename to 
server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCache.java
index 74bd73b..ce3237e 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCache.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCache.java
@@ -28,9 +28,9 @@ import static 
com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.ttl;
 import static org.apache.james.blob.cassandra.BlobTables.BucketBlobTable.ID;
-import static org.apache.james.blob.cassandra.BlobTables.DumbBlobCache.DATA;
-import static 
org.apache.james.blob.cassandra.BlobTables.DumbBlobCache.TABLE_NAME;
-import static 
org.apache.james.blob.cassandra.BlobTables.DumbBlobCache.TTL_FOR_ROW;
+import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.DATA;
+import static 
org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.TABLE_NAME;
+import static 
org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.TTL_FOR_ROW;
 
 import java.nio.ByteBuffer;
 
@@ -46,7 +46,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import reactor.core.publisher.Mono;
 
-public class CassandraDumbBlobStoreCache implements DumbBlobStoreCache {
+public class CassandraBlobStoreCache implements BlobStoreCache {
 
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final PreparedStatement insertStatement;
@@ -58,7 +58,7 @@ public class CassandraDumbBlobStoreCache implements 
DumbBlobStoreCache {
 
     @Inject
     @VisibleForTesting
-    CassandraDumbBlobStoreCache(Session session, CassandraCacheConfiguration 
cacheConfiguration) {
+    CassandraBlobStoreCache(Session session, CassandraCacheConfiguration 
cacheConfiguration) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.insertStatement = prepareInsert(session);
         this.selectStatement = prepareSelect(session);
diff --git 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCacheContract.java
 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
similarity index 98%
rename from 
server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCacheContract.java
rename to 
server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
index e4101f6..6819aca 100644
--- 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCacheContract.java
+++ 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java
@@ -33,11 +33,11 @@ import com.google.common.base.Strings;
 
 import reactor.core.publisher.Mono;
 
-public interface DumbBlobStoreCacheContract {
+public interface BlobStoreCacheContract {
 
     byte[] EIGHT_KILOBYTES = Strings.repeat("01234567\n", 
1024).getBytes(StandardCharsets.UTF_8);
 
-    DumbBlobStoreCache testee();
+    BlobStoreCache testee();
 
     BlobId.Factory blobIdFactory();
 
@@ -64,7 +64,6 @@ public interface DumbBlobStoreCacheContract {
     default void shouldReturnEmptyWhenReadWithTimeOut() {
         BlobId blobId = blobIdFactory().randomId();
         Mono.from(testee().cache(blobId, EIGHT_KILOBYTES)).block();
-
     }
 
     @Test
diff --git 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
new file mode 100644
index 0000000..1c5ff09
--- /dev/null
+++ 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java
@@ -0,0 +1,163 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.blob.cassandra.cache;
+
+import static 
org.apache.james.blob.api.BlobStore.StoragePolicy.HIGH_PERFORMANCE;
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED;
+import static org.apache.james.blob.api.BucketName.DEFAULT;
+import static 
org.apache.james.blob.cassandra.cache.BlobStoreCacheContract.EIGHT_KILOBYTES;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreContract;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.blob.cassandra.CassandraBlobModule;
+import org.apache.james.blob.cassandra.CassandraBlobStore;
+import org.assertj.core.api.SoftAssertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import reactor.core.publisher.Mono;
+
+public class CacheBlobStoreTest implements BlobStoreContract {
+
+    private static final BucketName DEFAULT_BUCKERNAME = DEFAULT;
+    private static final BucketName TEST_BUCKERNAME = BucketName.of("test");
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new 
CassandraClusterExtension(
+        CassandraModule.aggregateModules(CassandraBlobModule.MODULE, 
CassandraBlobCacheModule.MODULE));
+
+    private BlobStore testee;
+    private BlobStore backend;
+    private BlobStoreCache cache;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        backend = CassandraBlobStore.forTesting(cassandra.getConf());
+        CassandraCacheConfiguration cacheConfig = new 
CassandraCacheConfiguration.Builder()
+            .sizeThresholdInBytes(EIGHT_KILOBYTES.length + 1)
+            .build();
+        cache = new CassandraBlobStoreCache(cassandra.getConf(), cacheConfig);
+        testee = new CacheBlobStore(cache, backend, cacheConfig, DEFAULT);
+    }
+
+    @Override
+    public BlobStore testee() {
+        return testee;
+    }
+
+    @Override
+    public BlobId.Factory blobIdFactory() {
+        return new HashBlobId.Factory();
+    }
+
+    @Test
+    public void shouldCacheWhenDefaultBucketName() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, 
EIGHT_KILOBYTES, SIZE_BASED)).block();
+
+        byte[] actual = Mono.from(cache.read(blobId)).block();
+        assertThat(actual).containsExactly(EIGHT_KILOBYTES);
+    }
+
+    @Test
+    public void shouldNotCacheWhenNotDefaultBucketName() {
+        BlobId blobId = Mono.from(testee().save(TEST_BUCKERNAME, 
EIGHT_KILOBYTES, SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(ignored -> {
+            
assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThat(Mono.from(backend.readBytes(TEST_BUCKERNAME, 
blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+        });
+    }
+
+    @Test
+    public void shouldNotCacheWhenDefaultBucketNameAndBigByteDataAndSizeBase() 
{
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, 
TWELVE_MEGABYTES, SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(ignored -> {
+            
assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, 
blobId)).block()).containsExactly(TWELVE_MEGABYTES);
+        });
+    }
+
+    @Test
+    public void shouldSavedBothInCacheAndBackendWhenSizeBase() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, 
EIGHT_KILOBYTES, SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            
assertThat(Mono.from(cache.read(blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, 
blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+        });
+    }
+
+    @Test
+    public void shouldSavedBothInCacheAndBackendWhenHighPerformance() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, 
EIGHT_KILOBYTES, HIGH_PERFORMANCE)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            
assertThat(Mono.from(cache.read(blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, 
blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+        });
+    }
+
+    @Test
+    public void shouldNotCacheWhenLowCost() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, 
EIGHT_KILOBYTES, LOW_COST)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            
assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, 
blobId)).block()).containsExactly(EIGHT_KILOBYTES);
+        });
+    }
+
+    @Test
+    public void shouldNotCacheWhenEmptyStream() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, new 
ByteArrayInputStream(EMPTY_BYTEARRAY), SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(soflty -> {
+            
assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, 
blobId)).block()).containsExactly(EMPTY_BYTEARRAY);
+        });
+    }
+
+    @Test
+    public void shouldRemoveBothInCacheAndBackendWhenDefaultBucketName() {
+        BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, 
EIGHT_KILOBYTES, SIZE_BASED)).block();
+
+        SoftAssertions.assertSoftly(ignored -> {
+            assertThatCode(Mono.from(testee().delete(DEFAULT_BUCKERNAME, 
blobId))::block)
+                .doesNotThrowAnyException();
+            
assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty();
+            assertThatThrownBy(() -> 
Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block())
+                .isInstanceOf(ObjectNotFoundException.class);
+        });
+    }
+}
diff --git 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCacheTest.java
 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCacheTest.java
similarity index 88%
rename from 
server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCacheTest.java
rename to 
server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCacheTest.java
index 5b88799..efdc705 100644
--- 
a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCacheTest.java
+++ 
b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCacheTest.java
@@ -27,16 +27,16 @@ import org.apache.james.blob.api.HashBlobId;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-public class CassandraDumbBlobStoreCacheTest implements 
DumbBlobStoreCacheContract {
+public class CassandraBlobStoreCacheTest implements BlobStoreCacheContract {
 
     @RegisterExtension
-    static CassandraClusterExtension cassandraCluster = new 
CassandraClusterExtension(CassandraDumbBlobCacheModule.MODULE);
+    static CassandraClusterExtension cassandraCluster = new 
CassandraClusterExtension(CassandraBlobCacheModule.MODULE);
 
     private final Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(50);
     private final int DEFAULT_THRESHOLD_IN_BYTES = EIGHT_KILOBYTES.length;
     private final Duration _2_SEC_TTL = Duration.ofSeconds(2);
 
-    private DumbBlobStoreCache testee;
+    private BlobStoreCache testee;
     private HashBlobId.Factory blobIdFactory;
 
     @BeforeEach
@@ -47,11 +47,11 @@ public class CassandraDumbBlobStoreCacheTest implements 
DumbBlobStoreCacheContra
             .timeOut(DEFAULT_READ_TIMEOUT)
             .ttl(_2_SEC_TTL)
             .build();
-        testee = new CassandraDumbBlobStoreCache(cassandra.getConf(), 
cacheConfiguration);
+        testee = new CassandraBlobStoreCache(cassandra.getConf(), 
cacheConfiguration);
     }
 
     @Override
-    public DumbBlobStoreCache testee() {
+    public BlobStoreCache testee() {
         return testee;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to