This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 3c4818289ca8777528284ad9470322538b4f2e95 Author: ducnv <duc91....@gmail.com> AuthorDate: Tue Apr 14 17:38:58 2020 +0700 JAMES-3140: Implement CacheBlobStoreTest --- .../blob/api/BucketDumbBlobStoreContract.java | 6 +- .../apache/james/blob/cassandra/BlobTables.java | 2 +- .../james/blob/cassandra/CassandraBlobModule.java | 14 -- .../blob/cassandra/CassandraDumbBlobStore.java | 10 +- ...DumbBlobStoreCache.java => BlobStoreCache.java} | 2 +- .../james/blob/cassandra/cache/CacheBlobStore.java | 163 +++++++++++++++++++++ .../blob/cassandra/cache/CachedDumbBlobStore.java | 80 ---------- ...heModule.java => CassandraBlobCacheModule.java} | 12 +- ...toreCache.java => CassandraBlobStoreCache.java} | 10 +- ...heContract.java => BlobStoreCacheContract.java} | 5 +- .../blob/cassandra/cache/CacheBlobStoreTest.java | 163 +++++++++++++++++++++ ...eTest.java => CassandraBlobStoreCacheTest.java} | 10 +- 12 files changed, 357 insertions(+), 120 deletions(-) diff --git a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java index 54d2384..8695118 100644 --- a/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java +++ b/server/blob/blob-api/src/test/java/org/apache/james/blob/api/BucketDumbBlobStoreContract.java @@ -54,7 +54,7 @@ public interface BucketDumbBlobStoreContract { default void deleteBucketShouldDeleteExistingBucketWithItsData() { DumbBlobStore store = testee(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block(); + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); Mono.from(store.deleteBucket(TEST_BUCKET_NAME)).block(); assertThatThrownBy(() -> store.read(TEST_BUCKET_NAME, TEST_BLOB_ID).read()) @@ -118,7 +118,7 @@ public interface BucketDumbBlobStoreContract { default void readStreamShouldThrowWhenBucketDoesNotExist() { DumbBlobStore store = testee(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block(); + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); assertThatThrownBy(() -> store.read(CUSTOM_BUCKET_NAME, TEST_BLOB_ID).read()) .isInstanceOf(ObjectNotFoundException.class); } @@ -127,7 +127,7 @@ public interface BucketDumbBlobStoreContract { default void readBytesShouldThrowWhenBucketDoesNotExistWithBigData() { DumbBlobStore store = testee(); - Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, ELEVEN_KILOBYTES)).block(); + Mono.from(store.save(TEST_BUCKET_NAME, TEST_BLOB_ID, SHORT_BYTEARRAY)).block(); assertThatThrownBy(() -> Mono.from(store.readBytes(CUSTOM_BUCKET_NAME, TEST_BLOB_ID)).block()) .isInstanceOf(ObjectNotFoundException.class); diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java index 92e6756..09a6823 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/BlobTables.java @@ -49,7 +49,7 @@ public interface BlobTables { String DATA = "data"; } - interface DumbBlobCache { + interface BlobStoreCache { String TABLE_NAME = "blob_cache"; String ID = "id"; String DATA = "data"; diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java index 934812d..1975c1a 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraBlobModule.java @@ -19,7 +19,6 @@ package org.apache.james.blob.cassandra; -import static com.datastax.driver.core.schemabuilder.TableOptions.CompactionOptions.TimeWindowCompactionStrategyOptions.CompactionWindowUnit.HOURS; import static org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobParts.DATA; import org.apache.james.backends.cassandra.components.CassandraModule; @@ -29,7 +28,6 @@ import org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobParts; import org.apache.james.blob.cassandra.BlobTables.DefaultBucketBlobTable; import com.datastax.driver.core.DataType; -import com.datastax.driver.core.schemabuilder.SchemaBuilder; public interface CassandraBlobModule { CassandraModule MODULE = CassandraModule @@ -67,17 +65,5 @@ public interface CassandraBlobModule { .addPartitionKey(BucketBlobParts.ID, DataType.text()) .addClusteringColumn(BucketBlobTable.NUMBER_OF_CHUNK, DataType.cint())) - .table(BlobTables.DumbBlobCache.TABLE_NAME) - .options(options -> options - .compactionOptions(SchemaBuilder.timeWindowCompactionStrategy() - .compactionWindowSize(1) - .compactionWindowUnit(HOURS)) - .readRepairChance(0.0)) - .comment("Write through cache for small blobs stored in a slower blob store implementation which is object storage" + - "Messages` headers and bodies are stored as blobparts.") - .statement(statement -> statement - .addPartitionKey(BlobTables.DumbBlobCache.ID, DataType.text()) - .addColumn(BlobTables.DumbBlobCache.DATA, DataType.blob())) - .build(); } diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java index 2e57325..240a514 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/CassandraDumbBlobStore.java @@ -21,6 +21,7 @@ package org.apache.james.blob.cassandra; import java.io.InputStream; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.NoSuchElementException; @@ -38,7 +39,9 @@ import org.apache.james.blob.cassandra.utils.DataChunker; import org.apache.james.util.ReactorUtils; import com.github.fge.lambdas.Throwing; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.io.ByteSource; import reactor.core.publisher.Flux; @@ -56,10 +59,11 @@ public class CassandraDumbBlobStore implements DumbBlobStore { private final BucketName defaultBucket; @Inject + @VisibleForTesting public CassandraDumbBlobStore(CassandraDefaultBucketDAO defaultBucketDAO, - CassandraBucketDAO bucketDAO, - CassandraConfiguration cassandraConfiguration, - @Named(DEFAULT_BUCKET) BucketName defaultBucket) { + CassandraBucketDAO bucketDAO, + CassandraConfiguration cassandraConfiguration, + @Named(DEFAULT_BUCKET) BucketName defaultBucket) { this.defaultBucketDAO = defaultBucketDAO; this.bucketDAO = bucketDAO; this.configuration = cassandraConfiguration; diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCache.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/BlobStoreCache.java similarity index 97% rename from server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCache.java rename to server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/BlobStoreCache.java index de60a33..9aec0b0 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCache.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/BlobStoreCache.java @@ -21,7 +21,7 @@ package org.apache.james.blob.cassandra.cache; import org.apache.james.blob.api.BlobId; import org.reactivestreams.Publisher; -public interface DumbBlobStoreCache { +public interface BlobStoreCache { Publisher<Void> cache(BlobId blobId, byte[] data); Publisher<byte[]> read(BlobId blobId); diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java new file mode 100644 index 0000000..0acc95e --- /dev/null +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CacheBlobStore.java @@ -0,0 +1,163 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.blob.cassandra.cache; + +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PushbackInputStream; + +import javax.inject.Inject; +import javax.inject.Named; + +import org.apache.commons.io.IOUtils; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.ObjectNotFoundException; +import org.apache.james.blob.api.ObjectStoreIOException; +import org.reactivestreams.Publisher; + +import com.google.common.base.Preconditions; + +import reactor.core.publisher.Mono; + +public class CacheBlobStore implements BlobStore { + + private static final String DEFAULT_BUCKET = "cassandraDefault"; + + private final BlobStoreCache cache; + private final BlobStore backend; + private final Integer sizeThresholdInBytes; + private final BucketName defaultBucket; + + @Inject + public CacheBlobStore(BlobStoreCache cache, BlobStore backend, + CassandraCacheConfiguration cacheConfiguration, + @Named(DEFAULT_BUCKET) BucketName defaultBucket) { + this.cache = cache; + this.backend = backend; + this.sizeThresholdInBytes = cacheConfiguration.getSizeThresholdInBytes(); + this.defaultBucket = defaultBucket; + } + + @Override + public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { + Preconditions.checkNotNull(bucketName, "bucketName should not be null"); + + return Mono.just(bucketName) + .filter(defaultBucket::equals) + .flatMap(ignored -> + Mono.from(cache.read(blobId)) + .<InputStream>flatMap(bytes -> Mono.fromCallable(() -> new ByteArrayInputStream(bytes))) + ) + .switchIfEmpty(Mono.fromCallable(() -> backend.read(bucketName, blobId))) + .blockOptional() + .orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId))); + } + + @Override + public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) { + return Mono.just(bucketName) + .filter(defaultBucket::equals) + .flatMap(ignored -> Mono.from(cache.read(blobId))) + .switchIfEmpty(Mono.from(backend.readBytes(bucketName, blobId))); + } + + @Override + public Mono<BlobId> save(BucketName bucketName, byte[] bytes, StoragePolicy storagePolicy) { + return Mono.from(backend.save(bucketName, bytes, storagePolicy)) + .flatMap(blobId -> { + if (isAbleToCache(bucketName, bytes, storagePolicy)) { + return Mono.from(cache.cache(blobId, bytes)).thenReturn(blobId); + } + return Mono.just(blobId); + }); + } + + @Override + public Publisher<BlobId> save(BucketName bucketName, InputStream inputStream, StoragePolicy storagePolicy) { + Preconditions.checkNotNull(inputStream, "InputStream must not be null"); + + PushbackInputStream pushbackInputStream = new PushbackInputStream(inputStream, sizeThresholdInBytes + 1); + return Mono.from(backend.save(bucketName, pushbackInputStream, storagePolicy)) + .flatMap(blobId -> + Mono.fromCallable(() -> isALargeStream(pushbackInputStream)) + .flatMap(largeStream -> { + if (!largeStream) { + return Mono.from(saveInCache(bucketName, blobId, pushbackInputStream, storagePolicy)) + .thenReturn(blobId); + } + return Mono.just(blobId); + }) + ); + } + + @Override + public BucketName getDefaultBucketName() { + return defaultBucket; + } + + @Override + public Mono<Void> delete(BucketName bucketName, BlobId blobId) { + return Mono.from(backend.delete(bucketName, blobId)) + .then(Mono.just(bucketName) + .filter(defaultBucket::equals) + .flatMap(ignored -> Mono.from(cache.remove(blobId))) + .then()); + } + + @Override + public Publisher<Void> deleteBucket(BucketName bucketName) { + return Mono.from(backend.deleteBucket(bucketName)); + } + + private Mono<Void> saveInCache(BucketName bucketName, BlobId blobId, PushbackInputStream pushbackInputStream, StoragePolicy storagePolicy) { + return Mono.fromCallable(() -> copyBytesFromStream(pushbackInputStream)) + .filter(bytes -> isAbleToCache(bucketName, bytes, storagePolicy)) + .flatMap(bytes -> Mono.from(cache.cache(blobId, bytes))); + } + + private byte[] copyBytesFromStream(PushbackInputStream pushbackInputStream) throws IOException { + byte[] bytes = new byte[pushbackInputStream.available()]; + int read = IOUtils.read(pushbackInputStream, bytes); + pushbackInputStream.unread(read); + return bytes; + } + + private boolean isALargeStream(PushbackInputStream pushbackInputStream) throws IOException { + pushbackInputStream.mark(0); + long skip = pushbackInputStream.skip(sizeThresholdInBytes + 1); + pushbackInputStream.unread(Math.toIntExact(skip)); + return skip >= sizeThresholdInBytes; + } + + /** + * bytes: byte[] from PushbackInputStream.If PushbackInputStream is empty bytes.length == 1 + */ + private boolean isAbleToCache(BucketName bucketName, byte[] bytes, StoragePolicy storagePolicy) { + return isAbleToCache(bucketName, storagePolicy) && bytes.length <= sizeThresholdInBytes && bytes.length > 1; + } + + private boolean isAbleToCache(BucketName bucketName, StoragePolicy storagePolicy) { + return defaultBucket.equals(bucketName) && !storagePolicy.equals(LOW_COST); + } +} diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java deleted file mode 100644 index 53f3df2..0000000 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedDumbBlobStore.java +++ /dev/null @@ -1,80 +0,0 @@ -package org.apache.james.blob.cassandra.cache; - -import java.io.ByteArrayInputStream; -import java.io.InputStream; - -import javax.inject.Inject; - -import org.apache.james.blob.api.BlobId; -import org.apache.james.blob.api.BucketName; -import org.apache.james.blob.api.DumbBlobStore; -import org.apache.james.blob.api.ObjectNotFoundException; -import org.apache.james.blob.api.ObjectStoreIOException; -import org.reactivestreams.Publisher; - -import com.github.fge.lambdas.Throwing; -import com.google.common.base.Preconditions; -import com.google.common.io.ByteSource; - -import reactor.core.publisher.Mono; - -public class CachedDumbBlobStore implements DumbBlobStore { - - private final DumbBlobStoreCache cache; - private final DumbBlobStore backend; - - @Inject - public CachedDumbBlobStore(DumbBlobStoreCache cache, DumbBlobStore backend) { - this.cache = cache; - this.backend = backend; - } - - @Override - public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { - Preconditions.checkNotNull(bucketName, "bucketName should not be null"); - - return Mono.from(cache.read(blobId)) - .map(bytes -> (InputStream) new ByteArrayInputStream(bytes)) - .switchIfEmpty(Mono.fromCallable(() -> backend.read(bucketName, blobId))) - .blockOptional() - .orElseThrow(() -> new ObjectNotFoundException(String.format("Could not retrieve blob metadata for %s", blobId))); - } - - @Override - public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) { - return Mono.from(cache.read(blobId)) - .switchIfEmpty(Mono.from(backend.readBytes(bucketName, blobId))); - } - - @Override - public Publisher<Void> save(BucketName bucketName, BlobId blobId, byte[] data) { - return Mono.from(cache.cache(blobId, data)) - .then(Mono.from(backend.save(bucketName, blobId, data))); - } - - @Override - public Publisher<Void> save(BucketName bucketName, BlobId blobId, InputStream inputStream) { - return Mono.fromCallable(() -> inputStream) - .map(stream -> cache.cache(blobId, stream)) - .then(Mono.from(backend.save(bucketName, blobId, inputStream))); - } - - @Override - public Publisher<Void> save(BucketName bucketName, BlobId blobId, ByteSource content) { - return Mono.from(backend.save(bucketName, blobId, content)) - .then(Mono.using(content::openBufferedStream, - inputStream -> Mono.from(cache.cache(blobId, inputStream)), - Throwing.consumer(InputStream::close).sneakyThrow())); - } - - @Override - public Publisher<Void> delete(BucketName bucketName, BlobId blobId) { - return Mono.from(backend.delete(bucketName, blobId)) - .then(Mono.from(cache.remove(blobId))); - } - - @Override - public Publisher<Void> deleteBucket(BucketName bucketName) { - return Mono.from(backend.deleteBucket(bucketName)); - } -} diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobCacheModule.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobCacheModule.java similarity index 83% rename from server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobCacheModule.java rename to server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobCacheModule.java index 6d4734e..3f8fa12 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobCacheModule.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobCacheModule.java @@ -20,20 +20,22 @@ package org.apache.james.blob.cassandra.cache; import static com.datastax.driver.core.schemabuilder.TableOptions.CompactionOptions.TimeWindowCompactionStrategyOptions.CompactionWindowUnit.HOURS; +import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.DATA; +import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.ID; +import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.TABLE_NAME; import org.apache.james.backends.cassandra.components.CassandraModule; -import org.apache.james.blob.cassandra.BlobTables; import com.datastax.driver.core.DataType; import com.datastax.driver.core.schemabuilder.SchemaBuilder; -public interface CassandraDumbBlobCacheModule { +public interface CassandraBlobCacheModule { double NO_READ_REPAIR = 0d; CassandraModule MODULE = CassandraModule .builder() - .table(BlobTables.DumbBlobCache.TABLE_NAME) + .table(TABLE_NAME) .options(options -> options .compactionOptions(SchemaBuilder.timeWindowCompactionStrategy() .compactionWindowSize(1) @@ -41,7 +43,7 @@ public interface CassandraDumbBlobCacheModule { .readRepairChance(NO_READ_REPAIR)) .comment("Write through cache for small blobs stored in a slower blob store implementation.") .statement(statement -> statement - .addPartitionKey(BlobTables.DumbBlobCache.ID, DataType.text()) - .addColumn(BlobTables.DumbBlobCache.DATA, DataType.blob())) + .addPartitionKey(ID, DataType.text()) + .addColumn(DATA, DataType.blob())) .build(); } diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCache.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCache.java similarity index 92% rename from server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCache.java rename to server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCache.java index 74bd73b..ce3237e 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCache.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCache.java @@ -28,9 +28,9 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import static com.datastax.driver.core.querybuilder.QueryBuilder.ttl; import static org.apache.james.blob.cassandra.BlobTables.BucketBlobTable.ID; -import static org.apache.james.blob.cassandra.BlobTables.DumbBlobCache.DATA; -import static org.apache.james.blob.cassandra.BlobTables.DumbBlobCache.TABLE_NAME; -import static org.apache.james.blob.cassandra.BlobTables.DumbBlobCache.TTL_FOR_ROW; +import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.DATA; +import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.TABLE_NAME; +import static org.apache.james.blob.cassandra.BlobTables.BlobStoreCache.TTL_FOR_ROW; import java.nio.ByteBuffer; @@ -46,7 +46,7 @@ import com.google.common.annotations.VisibleForTesting; import reactor.core.publisher.Mono; -public class CassandraDumbBlobStoreCache implements DumbBlobStoreCache { +public class CassandraBlobStoreCache implements BlobStoreCache { private final CassandraAsyncExecutor cassandraAsyncExecutor; private final PreparedStatement insertStatement; @@ -58,7 +58,7 @@ public class CassandraDumbBlobStoreCache implements DumbBlobStoreCache { @Inject @VisibleForTesting - CassandraDumbBlobStoreCache(Session session, CassandraCacheConfiguration cacheConfiguration) { + CassandraBlobStoreCache(Session session, CassandraCacheConfiguration cacheConfiguration) { this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); this.insertStatement = prepareInsert(session); this.selectStatement = prepareSelect(session); diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCacheContract.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java similarity index 98% rename from server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCacheContract.java rename to server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java index e4101f6..6819aca 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/DumbBlobStoreCacheContract.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/BlobStoreCacheContract.java @@ -33,11 +33,11 @@ import com.google.common.base.Strings; import reactor.core.publisher.Mono; -public interface DumbBlobStoreCacheContract { +public interface BlobStoreCacheContract { byte[] EIGHT_KILOBYTES = Strings.repeat("01234567\n", 1024).getBytes(StandardCharsets.UTF_8); - DumbBlobStoreCache testee(); + BlobStoreCache testee(); BlobId.Factory blobIdFactory(); @@ -64,7 +64,6 @@ public interface DumbBlobStoreCacheContract { default void shouldReturnEmptyWhenReadWithTimeOut() { BlobId blobId = blobIdFactory().randomId(); Mono.from(testee().cache(blobId, EIGHT_KILOBYTES)).block(); - } @Test diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java new file mode 100644 index 0000000..1c5ff09 --- /dev/null +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CacheBlobStoreTest.java @@ -0,0 +1,163 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.blob.cassandra.cache; + +import static org.apache.james.blob.api.BlobStore.StoragePolicy.HIGH_PERFORMANCE; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED; +import static org.apache.james.blob.api.BucketName.DEFAULT; +import static org.apache.james.blob.cassandra.cache.BlobStoreCacheContract.EIGHT_KILOBYTES; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.io.ByteArrayInputStream; + +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.components.CassandraModule; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BlobStoreContract; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.HashBlobId; +import org.apache.james.blob.api.ObjectNotFoundException; +import org.apache.james.blob.cassandra.CassandraBlobModule; +import org.apache.james.blob.cassandra.CassandraBlobStore; +import org.assertj.core.api.SoftAssertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import reactor.core.publisher.Mono; + +public class CacheBlobStoreTest implements BlobStoreContract { + + private static final BucketName DEFAULT_BUCKERNAME = DEFAULT; + private static final BucketName TEST_BUCKERNAME = BucketName.of("test"); + + @RegisterExtension + static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension( + CassandraModule.aggregateModules(CassandraBlobModule.MODULE, CassandraBlobCacheModule.MODULE)); + + private BlobStore testee; + private BlobStore backend; + private BlobStoreCache cache; + + @BeforeEach + void setUp(CassandraCluster cassandra) { + backend = CassandraBlobStore.forTesting(cassandra.getConf()); + CassandraCacheConfiguration cacheConfig = new CassandraCacheConfiguration.Builder() + .sizeThresholdInBytes(EIGHT_KILOBYTES.length + 1) + .build(); + cache = new CassandraBlobStoreCache(cassandra.getConf(), cacheConfig); + testee = new CacheBlobStore(cache, backend, cacheConfig, DEFAULT); + } + + @Override + public BlobStore testee() { + return testee; + } + + @Override + public BlobId.Factory blobIdFactory() { + return new HashBlobId.Factory(); + } + + @Test + public void shouldCacheWhenDefaultBucketName() { + BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block(); + + byte[] actual = Mono.from(cache.read(blobId)).block(); + assertThat(actual).containsExactly(EIGHT_KILOBYTES); + } + + @Test + public void shouldNotCacheWhenNotDefaultBucketName() { + BlobId blobId = Mono.from(testee().save(TEST_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block(); + + SoftAssertions.assertSoftly(ignored -> { + assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty(); + assertThat(Mono.from(backend.readBytes(TEST_BUCKERNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES); + }); + } + + @Test + public void shouldNotCacheWhenDefaultBucketNameAndBigByteDataAndSizeBase() { + BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, TWELVE_MEGABYTES, SIZE_BASED)).block(); + + SoftAssertions.assertSoftly(ignored -> { + assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty(); + assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(TWELVE_MEGABYTES); + }); + } + + @Test + public void shouldSavedBothInCacheAndBackendWhenSizeBase() { + BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block(); + + SoftAssertions.assertSoftly(soflty -> { + assertThat(Mono.from(cache.read(blobId)).block()).containsExactly(EIGHT_KILOBYTES); + assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES); + }); + } + + @Test + public void shouldSavedBothInCacheAndBackendWhenHighPerformance() { + BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, HIGH_PERFORMANCE)).block(); + + SoftAssertions.assertSoftly(soflty -> { + assertThat(Mono.from(cache.read(blobId)).block()).containsExactly(EIGHT_KILOBYTES); + assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES); + }); + } + + @Test + public void shouldNotCacheWhenLowCost() { + BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, LOW_COST)).block(); + + SoftAssertions.assertSoftly(soflty -> { + assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty(); + assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EIGHT_KILOBYTES); + }); + } + + @Test + public void shouldNotCacheWhenEmptyStream() { + BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, new ByteArrayInputStream(EMPTY_BYTEARRAY), SIZE_BASED)).block(); + + SoftAssertions.assertSoftly(soflty -> { + assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty(); + assertThat(Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()).containsExactly(EMPTY_BYTEARRAY); + }); + } + + @Test + public void shouldRemoveBothInCacheAndBackendWhenDefaultBucketName() { + BlobId blobId = Mono.from(testee().save(DEFAULT_BUCKERNAME, EIGHT_KILOBYTES, SIZE_BASED)).block(); + + SoftAssertions.assertSoftly(ignored -> { + assertThatCode(Mono.from(testee().delete(DEFAULT_BUCKERNAME, blobId))::block) + .doesNotThrowAnyException(); + assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty(); + assertThatThrownBy(() -> Mono.from(backend.readBytes(DEFAULT_BUCKERNAME, blobId)).block()) + .isInstanceOf(ObjectNotFoundException.class); + }); + } +} diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCacheTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCacheTest.java similarity index 88% rename from server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCacheTest.java rename to server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCacheTest.java index 5b88799..efdc705 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraDumbBlobStoreCacheTest.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CassandraBlobStoreCacheTest.java @@ -27,16 +27,16 @@ import org.apache.james.blob.api.HashBlobId; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; -public class CassandraDumbBlobStoreCacheTest implements DumbBlobStoreCacheContract { +public class CassandraBlobStoreCacheTest implements BlobStoreCacheContract { @RegisterExtension - static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraDumbBlobCacheModule.MODULE); + static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraBlobCacheModule.MODULE); private final Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(50); private final int DEFAULT_THRESHOLD_IN_BYTES = EIGHT_KILOBYTES.length; private final Duration _2_SEC_TTL = Duration.ofSeconds(2); - private DumbBlobStoreCache testee; + private BlobStoreCache testee; private HashBlobId.Factory blobIdFactory; @BeforeEach @@ -47,11 +47,11 @@ public class CassandraDumbBlobStoreCacheTest implements DumbBlobStoreCacheContra .timeOut(DEFAULT_READ_TIMEOUT) .ttl(_2_SEC_TTL) .build(); - testee = new CassandraDumbBlobStoreCache(cassandra.getConf(), cacheConfiguration); + testee = new CassandraBlobStoreCache(cassandra.getConf(), cacheConfiguration); } @Override - public DumbBlobStoreCache testee() { + public BlobStoreCache testee() { return testee; } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org