This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b9121b72fde6e203af8b515ba30ae429aed51ba4
Author: ducnv <duc91....@gmail.com>
AuthorDate: Tue Apr 14 17:38:33 2020 +0700

    JAMES-3140: Implement CacheBlobStore
---
 .../blob/api/BucketDumbBlobStoreContract.java      |  9 +--
 .../james/blob/cassandra/CassandraBlobModule.java  | 14 ++++
 .../james/blob/cassandra/CassandraBucketDAO.java   |  6 +-
 .../blob/cassandra/CassandraDefaultBucketDAO.java  |  2 +-
 .../blob/cassandra/CassandraDumbBlobStore.java     |  2 +-
 .../blob/cassandra/cache/CachedDumbBlobStore.java  | 80 ++++++++++++++++++++++
 6 files changed, 104 insertions(+), 9 deletions(-)

diff --git 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
index d560587..54d2384 100644
--- 
a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
+++ 
b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java
@@ -20,6 +20,7 @@
 package org.apache.james.blob.api;
 
 import static 
org.apache.james.blob.api.DumbBlobStoreFixture.CUSTOM_BUCKET_NAME;
+import static org.apache.james.blob.api.DumbBlobStoreFixture.ELEVEN_KILOBYTES;
 import static 
org.apache.james.blob.api.DumbBlobStoreFixture.OTHER_TEST_BLOB_ID;
 import static org.apache.james.blob.api.DumbBlobStoreFixture.SHORT_BYTEARRAY;
 import static org.apache.james.blob.api.DumbBlobStoreFixture.SHORT_STRING;
@@ -53,7 +54,7 @@ public interface BucketDumbBlobStoreContract {
     default void deleteBucketShouldDeleteExistingBucketWithItsData() {
         DumbBlobStore store = testee();
 
-        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
SHORT_BYTEARRAY)).block();
+        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ELEVEN_KILOBYTES)).block();
         Mono.from(store.deleteBucket(TEST_BUCKET_NAME)).block();
 
         assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, 
TEST_BLOB_ID).read())
@@ -117,16 +118,16 @@ public interface BucketDumbBlobStoreContract {
     default void readStreamShouldThrowWhenBucketDoesNotExist() {
         DumbBlobStore store = testee();
 
-        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
SHORT_BYTEARRAY)).block();
+        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ELEVEN_KILOBYTES)).block();
         assertThatThrownBy(() -> store.read(CUSTOM_BUCKET_NAME, 
TEST_BLOB_ID).read())
             .isInstanceOf(ObjectNotFoundException.class);
     }
 
     @Test
-    default void readBytesShouldThrowWhenBucketDoesNotExist() {
+    default void readBytesShouldThrowWhenBucketDoesNotExistWithBigData() {
         DumbBlobStore store = testee();
 
-        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
SHORT_BYTEARRAY)).block();
+        Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, 
ELEVEN_KILOBYTES)).block();
 
         assertThatThrownBy(() -> Mono.from(store.readBytes(CUSTOM_BUCKET_NAME, 
TEST_BLOB_ID)).block())
             .isInstanceOf(ObjectNotFoundException.class);
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java
index 1975c1a..934812d 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.blob.cassandra;
 
+import static 
com.datastax.driver.core.schemabuilder.TableOptions.CompactionOptions.TimeWindowCompactionStrategyOptions.CompactionWindowUnit.HOURS;
 import static 
org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobParts.DATA;
 
 import org.apache.james.backends.cassandra.components.CassandraModule;
@@ -28,6 +29,7 @@ import 
org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobParts;
 import org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobTable;
 
 import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.schemabuilder.SchemaBuilder;
 
 public interface CassandraBlobModule {
     CassandraModule MODULE = CassandraModule
@@ -65,5 +67,17 @@ public interface CassandraBlobModule {
             .addPartitionKey(BucketBlobParts.ID, DataType.text())
             .addClusteringColumn(BucketBlobTable.NUMBER_OF_CHUNK, 
DataType.cint()))
 
+        .table(BlobTables.DumbBlobCache.TABLE_NAME)
+        .options(options -> options
+            .compactionOptions(SchemaBuilder.timeWindowCompactionStrategy()
+                .compactionWindowSize(1)
+                .compactionWindowUnit(HOURS))
+            .readRepairChance(0.0))
+        .comment("Write through cache for small blobs stored in a slower blob 
store implementation which is object storage" +
+            "Messages` headers and bodies are stored as blobparts.")
+        .statement(statement -> statement
+            .addPartitionKey(BlobTables.DumbBlobCache.ID, DataType.text())
+            .addColumn(BlobTables.DumbBlobCache.DATA, DataType.blob()))
+
         .build();
 }
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
index f6d124f..0287f37 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java
@@ -46,7 +46,7 @@ import com.google.common.annotations.VisibleForTesting;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-class CassandraBucketDAO {
+public class CassandraBucketDAO {
     private final BlobId.Factory blobIdFactory;
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final PreparedStatement insert;
@@ -59,7 +59,7 @@ class CassandraBucketDAO {
 
     @Inject
     @VisibleForTesting
-    CassandraBucketDAO(BlobId.Factory blobIdFactory, Session session) {
+    public CassandraBucketDAO(BlobId.Factory blobIdFactory, Session session) {
         this.blobIdFactory = blobIdFactory;
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.insert = prepareInsert(session);
@@ -168,7 +168,7 @@ class CassandraBucketDAO {
                 .setString(BucketBlobParts.ID, blobId.asString()));
     }
 
-    Flux<Pair<BucketName, BlobId>> listAll() {
+    public Flux<Pair<BucketName, BlobId>> listAll() {
         return cassandraAsyncExecutor.executeRows(listAll.bind())
             .map(row -> Pair.of(BucketName.of(row.getString(BUCKET)), 
blobIdFactory.from(row.getString(ID))));
     }
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
index d564066..af99a4b 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java
@@ -53,7 +53,7 @@ public class CassandraDefaultBucketDAO {
 
     @Inject
     @VisibleForTesting
-    CassandraDefaultBucketDAO(Session session) {
+    public CassandraDefaultBucketDAO(Session session) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.insert = prepareInsert(session);
         this.select = prepareSelect(session);
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
index 94e2bde..2e57325 100644
--- 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java
@@ -56,7 +56,7 @@ public class CassandraDumbBlobStore implements DumbBlobStore {
     private final BucketName defaultBucket;
 
     @Inject
-    CassandraDumbBlobStore(CassandraDefaultBucketDAO defaultBucketDAO,
+    public CassandraDumbBlobStore(CassandraDefaultBucketDAO defaultBucketDAO,
                            CassandraBucketDAO bucketDAO,
                            CassandraConfiguration cassandraConfiguration,
                            @Named(DEFAULT_BUCKET) BucketName defaultBucket) {
diff --git 
a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java
 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java
new file mode 100644
index 0000000..53f3df2
--- /dev/null
+++ 
b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java
@@ -0,0 +1,80 @@
+package org.apache.james.blob.cassandra.cache;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+
+import javax.inject.Inject;
+
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.DumbBlobStore;
+import org.apache.james.blob.api.ObjectNotFoundException;
+import org.apache.james.blob.api.ObjectStoreIOException;
+import org.reactivestreams.Publisher;
+
+import com.github.fge.lambdas.Throwing;
+import com.google.common.base.Preconditions;
+import com.google.common.io.ByteSource;
+
+import reactor.core.publisher.Mono;
+
+public class CachedDumbBlobStore implements DumbBlobStore {
+
+    private final DumbBlobStoreCache cache;
+    private final DumbBlobStore backend;
+
+    @Inject
+    public CachedDumbBlobStore(DumbBlobStoreCache cache, DumbBlobStore 
backend) {
+        this.cache = cache;
+        this.backend = backend;
+    }
+
+    @Override
+    public InputStream read(BucketName bucketName, BlobId blobId) throws 
ObjectStoreIOException, ObjectNotFoundException {
+        Preconditions.checkNotNull(bucketName, "bucketName should not be 
null");
+
+        return Mono.from(cache.read(blobId))
+            .map(bytes -> (InputStream) new ByteArrayInputStream(bytes))
+            .switchIfEmpty(Mono.fromCallable(() -> backend.read(bucketName, 
blobId)))
+            .blockOptional()
+            .orElseThrow(() -> new 
ObjectNotFoundException(String.format("Could not retrieve blob metadata for 
%s", blobId)));
+    }
+
+    @Override
+    public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) {
+        return Mono.from(cache.read(blobId))
+            .switchIfEmpty(Mono.from(backend.readBytes(bucketName, blobId)));
+    }
+
+    @Override
+    public Publisher<Void> save(BucketName bucketName, BlobId blobId, byte[] 
data) {
+        return Mono.from(cache.cache(blobId, data))
+            .then(Mono.from(backend.save(bucketName, blobId, data)));
+    }
+
+    @Override
+    public Publisher<Void> save(BucketName bucketName, BlobId blobId, 
InputStream inputStream) {
+        return Mono.fromCallable(() -> inputStream)
+            .map(stream -> cache.cache(blobId, stream))
+            .then(Mono.from(backend.save(bucketName, blobId, inputStream)));
+    }
+
+    @Override
+    public Publisher<Void> save(BucketName bucketName, BlobId blobId, 
ByteSource content) {
+        return Mono.from(backend.save(bucketName, blobId, content))
+            .then(Mono.using(content::openBufferedStream,
+                inputStream -> Mono.from(cache.cache(blobId, inputStream)),
+                Throwing.consumer(InputStream::close).sneakyThrow()));
+    }
+
+    @Override
+    public Publisher<Void> delete(BucketName bucketName, BlobId blobId) {
+        return Mono.from(backend.delete(bucketName, blobId))
+            .then(Mono.from(cache.remove(blobId)));
+    }
+
+    @Override
+    public Publisher<Void> deleteBucket(BucketName bucketName) {
+        return Mono.from(backend.deleteBucket(bucketName));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to