This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b9121b72fde6e203af8b515ba30ae429aed51ba4 Author: ducnv <duc91....@gmail.com> AuthorDate: Tue Apr 14 17:38:33 2020 +0700 JAMES-3140: Implement CacheBlobStore --- .../blob/api/BucketDumbBlobStoreContract.java | 9 +-- .../james/blob/cassandra/CassandraBlobModule.java | 14 ++++ .../james/blob/cassandra/CassandraBucketDAO.java | 6 +- .../blob/cassandra/CassandraDefaultBucketDAO.java | 2 +- .../blob/cassandra/CassandraDumbBlobStore.java | 2 +- .../blob/cassandra/cache/CachedDumbBlobStore.java | 80 ++++++++++++++++++++++ 6 files changed, 104 insertions(+), 9 deletions(-) diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java index d560587..54d2384 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java @@ -20,6 +20,7 @@ package org.apache.james.blob.api; import static org.apache.james.blob.api.DumbBlobStoreFixture.CUSTOM_BUCKET_NAME; +import static org.apache.james.blob.api.DumbBlobStoreFixture.ELEVEN_KILOBYTES; import static org.apache.james.blob.api.DumbBlobStoreFixture.OTHER_TEST_BLOB_ID; import static org.apache.james.blob.api.DumbBlobStoreFixture.SHORT_BYTEARRAY; import static org.apache.james.blob.api.DumbBlobStoreFixture.SHORT_STRING; @@ -53,7 +54,7 @@ public interface BucketDumbBlobStoreContract { default void deleteBucketShouldDeleteExistingBucketWithItsData() { DumbBlobStore store = testee(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block(); Mono.from(store.deleteBucket(TEST_BUCKET_NAME)).block(); assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).read()) @@ -117,16 +118,16 @@ public interface BucketDumbBlobStoreContract { default void readStreamShouldThrowWhenBucketDoesNotExist() { DumbBlobStore store = testee(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block(); assertThatThrownBy(() -> store.read(CUSTOM_BUCKET_NAME, TEST_BLOB_ID).read()) .isInstanceOf(ObjectNotFoundException.class); } @Test - default void readBytesShouldThrowWhenBucketDoesNotExist() { + default void readBytesShouldThrowWhenBucketDoesNotExistWithBigData() { DumbBlobStore store = testee(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block(); assertThatThrownBy(() -> Mono.from(store.readBytes(CUSTOM_BUCKET_NAME, TEST_BLOB_ID)).block()) .isInstanceOf(ObjectNotFoundException.class); diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java index 1975c1a..934812d 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java @@ -19,6 +19,7 @@ package org.apache.james.blob.cassandra; +import static com.datastax.driver.core.schemabuilder.TableOptions.CompactionOptions.TimeWindowCompactionStrategyOptions.CompactionWindowUnit.HOURS; import static org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobParts.DATA; import org.apache.james.backends.cassandra.components.CassandraModule; @@ -28,6 +29,7 @@ import org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobParts; import org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobTable; import com.datastax.driver.core.DataType; +import com.datastax.driver.core.schemabuilder.SchemaBuilder; public interface CassandraBlobModule { CassandraModule MODULE = CassandraModule @@ -65,5 +67,17 @@ public interface CassandraBlobModule { .addPartitionKey(BucketBlobParts.ID, DataType.text()) .addClusteringColumn(BucketBlobTable.NUMBER_OF_CHUNK, DataType.cint())) + .table(BlobTables.DumbBlobCache.TABLE_NAME) + .options(options -> options + .compactionOptions(SchemaBuilder.timeWindowCompactionStrategy() + .compactionWindowSize(1) + .compactionWindowUnit(HOURS)) + .readRepairChance(0.0)) + .comment("Write through cache for small blobs stored in a slower blob store implementation which is object storage" + + "Messages` headers and bodies are stored as blobparts.") + .statement(statement -> statement + .addPartitionKey(BlobTables.DumbBlobCache.ID, DataType.text()) + .addColumn(BlobTables.DumbBlobCache.DATA, DataType.blob())) + .build(); } diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java index f6d124f..0287f37 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBucketDAO.java @@ -46,7 +46,7 @@ import com.google.common.annotations.VisibleForTesting; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -class CassandraBucketDAO { +public class CassandraBucketDAO { private final BlobId.Factory blobIdFactory; private final CassandraAsyncExecutor cassandraAsyncExecutor; private final PreparedStatement insert; @@ -59,7 +59,7 @@ class CassandraBucketDAO { @Inject @VisibleForTesting - CassandraBucketDAO(BlobId.Factory blobIdFactory, Session session) { + public CassandraBucketDAO(BlobId.Factory blobIdFactory, Session session) { this.blobIdFactory = blobIdFactory; this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); this.insert = prepareInsert(session); @@ -168,7 +168,7 @@ class CassandraBucketDAO { .setString(BucketBlobParts.ID, blobId.asString())); } - Flux<Pair<BucketName, BlobId>> listAll() { + public Flux<Pair<BucketName, BlobId>> listAll() { return cassandraAsyncExecutor.executeRows(listAll.bind()) .map(row -> Pair.of(BucketName.of(row.getString(BUCKET)), blobIdFactory.from(row.getString(ID)))); } diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java index d564066..af99a4b 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDefaultBucketDAO.java @@ -53,7 +53,7 @@ public class CassandraDefaultBucketDAO { @Inject @VisibleForTesting - CassandraDefaultBucketDAO(Session session) { + public CassandraDefaultBucketDAO(Session session) { this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); this.insert = prepareInsert(session); this.select = prepareSelect(session); diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java index 94e2bde..2e57325 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java @@ -56,7 +56,7 @@ public class CassandraDumbBlobStore implements DumbBlobStore { private final BucketName defaultBucket; @Inject - CassandraDumbBlobStore(CassandraDefaultBucketDAO defaultBucketDAO, + public CassandraDumbBlobStore(CassandraDefaultBucketDAO defaultBucketDAO, CassandraBucketDAO bucketDAO, CassandraConfiguration cassandraConfiguration, @Named(DEFAULT_BUCKET) BucketName defaultBucket) { diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java new file mode 100644 index 0000000..53f3df2 --- /dev/null +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java @@ -0,0 +1,80 @@ +package org.apache.james.blob.cassandra.cache; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; + +import javax.inject.Inject; + +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.DumbBlobStore; +import org.apache.james.blob.api.ObjectNotFoundException; +import org.apache.james.blob.api.ObjectStoreIOException; +import org.reactivestreams.Publisher; + +import com.github.fge.lambdas.Throwing; +import com.google.common.base.Preconditions; +import com.google.common.io.ByteSource; + +import reactor.core.publisher.Mono; + +public class CachedDumbBlobStore implements DumbBlobStore { + + private final DumbBlobStoreCache cache; + private final DumbBlobStore backend; + + @Inject + public CachedDumbBlobStore(DumbBlobStoreCache cache, DumbBlobStore backend) { + this.cache = cache; + this.backend = backend; + } + + @Override + public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { + Preconditions.checkNotNull(bucketName, "bucketName should not be null"); + + return Mono.from(cache.read(blobId)) + .map(bytes -> (InputStream) new ByteArrayInputStream(bytes)) + .switchIfEmpty(Mono.fromCallable(() -> backend.read(bucketName, blobId))) + .blockOptional() + .orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId))); + } + + @Override + public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) { + return Mono.from(cache.read(blobId)) + .switchIfEmpty(Mono.from(backend.readBytes(bucketName, blobId))); + } + + @Override + public Publisher<Void> save(BucketName bucketName, BlobId blobId, byte[] data) { + return Mono.from(cache.cache(blobId, data)) + .then(Mono.from(backend.save(bucketName, blobId, data))); + } + + @Override + public Publisher<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream) { + return Mono.fromCallable(() -> inputStream) + .map(stream -> cache.cache(blobId, stream)) + .then(Mono.from(backend.save(bucketName, blobId, inputStream))); + } + + @Override + public Publisher<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) { + return Mono.from(backend.save(bucketName, blobId, content)) + .then(Mono.using(content::openBufferedStream, + inputStream -> Mono.from(cache.cache(blobId, inputStream)), + Throwing.consumer(InputStream::close).sneakyThrow())); + } + + @Override + public Publisher<Void> delete(BucketName bucketName, BlobId blobId) { + return Mono.from(backend.delete(bucketName, blobId)) + .then(Mono.from(cache.remove(blobId))); + } + + @Override + public Publisher<Void> deleteBucket(BucketName bucketName) { + return Mono.from(backend.deleteBucket(bucketName)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org