This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 2c2f2ea125ffe6d300487352aa0d34c7e9fe7213 Author: Benoit Tellier <[email protected]> AuthorDate: Thu Oct 22 08:45:00 2020 +0700 JAMES-3433 BlobStore reads should have a StoragePolicy associated Cached blob store only attempts cache reads when HIGH_PERFORMANCE in being used. --- .../cassandra/mail/CassandraAttachmentMapper.java | 2 +- .../cassandra/mail/CassandraMessageDAO.java | 12 ++-- .../cassandra/mail/CassandraMessageDAOV3.java | 12 ++-- .../vault/blob/BlobStoreDeletedMessageVault.java | 2 +- .../java/org/apache/james/blob/api/BlobStore.java | 8 +++ .../java/org/apache/james/blob/api/BlobType.java | 13 +++- .../blob/cassandra/cache/CachedBlobStore.java | 20 +++++- .../blob/cassandra/cache/CachedBlobStoreTest.java | 71 +++++++++++----------- .../main/java/org/apache/james/blob/api/Store.java | 2 +- .../export/file/LocalFileBlobExportMechanism.java | 4 +- .../apache/james/blob/mail/MimeMessagePartsId.java | 7 ++- .../linshare/LinshareBlobExportMechanism.java | 4 +- 12 files changed, 97 insertions(+), 60 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java index dd6edec..9db614a 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java @@ -96,7 +96,7 @@ public class CassandraAttachmentMapper implements AttachmentMapper { @Override public InputStream loadAttachmentContent(AttachmentId attachmentId) throws AttachmentNotFoundException, IOException { return attachmentDAOV2.getAttachment(attachmentId) - .map(daoAttachment -> blobStore.read(blobStore.getDefaultBucketName(), daoAttachment.getBlobId())) + .map(daoAttachment -> blobStore.read(blobStore.getDefaultBucketName(), daoAttachment.getBlobId(), LOW_COST)) .blockOptional() .orElseThrow(() -> new AttachmentNotFoundException(attachmentId.toString())); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java index 8d97ff5..98b22d6 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAO.java @@ -319,9 +319,9 @@ public class CassandraMessageDAO { case Full: return getFullContent(headerId, bodyId); case Headers: - return getContent(headerId); + return getContent(headerId, SIZE_BASED); case Body: - return getContent(bodyId) + return getContent(bodyId, LOW_COST) .map(data -> Bytes.concat(new byte[bodyStartOctet], data)); case Metadata: return Mono.just(EMPTY_BYTE_ARRAY); @@ -331,12 +331,12 @@ public class CassandraMessageDAO { } private Mono<byte[]> getFullContent(BlobId headerId, BlobId bodyId) { - return getContent(headerId) - .zipWith(getContent(bodyId), Bytes::concat); + return getContent(headerId, SIZE_BASED) + .zipWith(getContent(bodyId, LOW_COST), Bytes::concat); } - private Mono<byte[]> getContent(BlobId blobId) { - return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobId)); + private Mono<byte[]> getContent(BlobId blobId, BlobStore.StoragePolicy storagePolicy) { + return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobId, storagePolicy)); } private BlobId retrieveBlobId(String field, Row row) { diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java index 8f349da..c665a85 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageDAOV3.java @@ -346,9 +346,9 @@ public class CassandraMessageDAOV3 { case Full: return getFullContent(headerId, bodyId); case Headers: - return getContent(headerId); + return getContent(headerId, SIZE_BASED); case Body: - return getContent(bodyId) + return getContent(bodyId, LOW_COST) .map(data -> Bytes.concat(new byte[bodyStartOctet], data)); case Metadata: return Mono.just(EMPTY_BYTE_ARRAY); @@ -358,12 +358,12 @@ public class CassandraMessageDAOV3 { } private Mono<byte[]> getFullContent(BlobId headerId, BlobId bodyId) { - return getContent(headerId) - .zipWith(getContent(bodyId), Bytes::concat); + return getContent(headerId, SIZE_BASED) + .zipWith(getContent(bodyId, LOW_COST), Bytes::concat); } - private Mono<byte[]> getContent(BlobId blobId) { - return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobId)); + private Mono<byte[]> getContent(BlobId blobId, BlobStore.StoragePolicy storagePolicy) { + return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), blobId, storagePolicy)); } private BlobId retrieveBlobId(String field, Row row) { diff --git a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java index d61247b..e10cd2b 100644 --- a/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java +++ b/mailbox/plugin/deleted-messages-vault/src/main/java/org/apache/james/vault/blob/BlobStoreDeletedMessageVault.java @@ -122,7 +122,7 @@ public class BlobStoreDeletedMessageVault implements DeletedMessageVault { } private Mono<InputStream> loadMimeMessage(StorageInformation storageInformation, Username username, MessageId messageId) { - return Mono.fromSupplier(() -> blobStore.read(storageInformation.getBucketName(), storageInformation.getBlobId())) + return Mono.fromSupplier(() -> blobStore.read(storageInformation.getBucketName(), storageInformation.getBlobId(), LOW_COST)) .onErrorResume( ObjectNotFoundException.class, ex -> Mono.error(new DeletedMessageContentNotFoundException(username, messageId))); diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java index a887a32..dc4abef 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobStore.java @@ -43,6 +43,14 @@ public interface BlobStore { InputStream read(BucketName bucketName, BlobId blobId); + default Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId, StoragePolicy storagePolicy) { + return readBytes(bucketName, blobId); + } + + default InputStream read(BucketName bucketName, BlobId blobId, StoragePolicy storagePolicy) { + return read(bucketName, blobId); + } + BucketName getDefaultBucketName(); Publisher<Void> deleteBucket(BucketName bucketName); diff --git a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobType.java b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobType.java index e068c68..ecc0d61 100644 --- a/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobType.java +++ b/server/blob/blob-api/src/main/java/org/apache/james/blob/api/BlobType.java @@ -23,27 +23,34 @@ import java.util.Objects; public class BlobType { private final String name; + private final BlobStore.StoragePolicy storagePolicy; - public BlobType(String name) { + public BlobType(String name, BlobStore.StoragePolicy storagePolicy) { this.name = name; + this.storagePolicy = storagePolicy; } public String getName() { return name; } + public BlobStore.StoragePolicy getStoragePolicy() { + return storagePolicy; + } + @Override public final boolean equals(Object o) { if (o instanceof BlobType) { BlobType blobType = (BlobType) o; - return Objects.equals(this.name, blobType.name); + return Objects.equals(this.name, blobType.name) + && Objects.equals(this.storagePolicy, blobType.storagePolicy); } return false; } @Override public final int hashCode() { - return Objects.hash(name); + return Objects.hash(name, storagePolicy); } } diff --git a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java index 0e998c0..87fdfa5 100644 --- a/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java +++ b/server/blob/blob-cassandra/src/main/java/org/apache/james/blob/cassandra/cache/CachedBlobStore.java @@ -132,7 +132,10 @@ public class CachedBlobStore implements BlobStore { } @Override - public InputStream read(BucketName bucketName, BlobId blobId) throws ObjectStoreIOException, ObjectNotFoundException { + public InputStream read(BucketName bucketName, BlobId blobId, StoragePolicy storagePolicy) throws ObjectStoreIOException, ObjectNotFoundException { + if (storagePolicy == LOW_COST) { + return backend.read(bucketName, blobId); + } return Mono.just(bucketName) .filter(getDefaultBucketName()::equals) .flatMap(defaultBucket -> readInDefaultBucket(bucketName, blobId)) @@ -152,13 +155,26 @@ public class CachedBlobStore implements BlobStore { } @Override - public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId) { + public Mono<byte[]> readBytes(BucketName bucketName, BlobId blobId, StoragePolicy storagePolicy) { + if (storagePolicy == LOW_COST) { + return readBytesFromBackend(bucketName, blobId); + } if (getDefaultBucketName().equals(bucketName)) { return readBytesInDefaultBucket(bucketName, blobId); } return readBytesFromBackend(bucketName, blobId); } + @Override + public Publisher<byte[]> readBytes(BucketName bucketName, BlobId blobId) { + return readBytes(bucketName, blobId, LOW_COST); + } + + @Override + public InputStream read(BucketName bucketName, BlobId blobId) { + return read(bucketName, blobId, LOW_COST); + } + private Mono<byte[]> readBytesInDefaultBucket(BucketName bucketName, BlobId blobId) { return readFromCache(blobId).switchIfEmpty( readBytesFromBackend(bucketName, blobId) diff --git a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java index bf0a405..580f910 100644 --- a/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java +++ b/server/blob/blob-cassandra/src/test/java/org/apache/james/blob/cassandra/cache/CachedBlobStoreTest.java @@ -216,7 +216,7 @@ public class CachedBlobStoreTest implements BlobStoreContract { SoftAssertions.assertSoftly(soflty -> { soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty(); - soflty.assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(DEFAULT_BUCKETNAME, blobId)).block())) + soflty.assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block())) .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES)); soflty.assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block())) .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES)); @@ -229,7 +229,7 @@ public class CachedBlobStoreTest implements BlobStoreContract { SoftAssertions.assertSoftly(soflty -> { soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty(); - soflty.assertThat(testee().read(DEFAULT_BUCKETNAME, blobId)) + soflty.assertThat(testee().read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)) .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES)); soflty.assertThat(new ByteArrayInputStream(Mono.from(cache.read(blobId)).block())) .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES)); @@ -242,7 +242,7 @@ public class CachedBlobStoreTest implements BlobStoreContract { SoftAssertions.assertSoftly(soflty -> { soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty(); - soflty.assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(TEST_BUCKETNAME, blobId)).block())) + soflty.assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block())) .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES)); soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty(); }); @@ -254,7 +254,7 @@ public class CachedBlobStoreTest implements BlobStoreContract { SoftAssertions.assertSoftly(soflty -> { soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty(); - soflty.assertThat(testee().read(TEST_BUCKETNAME, blobId)) + soflty.assertThat(testee().read(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE)) .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES)); soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty(); }); @@ -266,7 +266,7 @@ public class CachedBlobStoreTest implements BlobStoreContract { SoftAssertions.assertSoftly(soflty -> { soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty(); - soflty.assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(DEFAULT_BUCKETNAME, blobId)).block())) + soflty.assertThat(new ByteArrayInputStream(Mono.from(testee().readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block())) .hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES)); soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty(); }); @@ -278,7 +278,7 @@ public class CachedBlobStoreTest implements BlobStoreContract { SoftAssertions.assertSoftly(soflty -> { soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty(); - soflty.assertThat(testee().read(DEFAULT_BUCKETNAME, blobId)) + soflty.assertThat(testee().read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)) .hasSameContentAs(new ByteArrayInputStream(TWELVE_MEGABYTES)); soflty.assertThat(Mono.from(cache.read(blobId)).blockOptional()).isEmpty(); }); @@ -294,7 +294,7 @@ public class CachedBlobStoreTest implements BlobStoreContract { Mono.from(cache.read(blobId)).block(); - assertThat(testee().read(DEFAULT_BUCKETNAME, blobId)) + assertThat(testee().read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)) .hasSameContentAs(new ByteArrayInputStream(APPROXIMATELY_FIVE_KILOBYTES)); } @@ -304,8 +304,8 @@ public class CachedBlobStoreTest implements BlobStoreContract { void readBlobStoreCacheWithNoneDefaultBucketNameShouldNotImpact() { BlobId blobId = Mono.from(testee.save(TEST_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block(); - testee.read(TEST_BUCKETNAME, blobId); - testee.read(TEST_BUCKETNAME, blobId); + testee.read(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE); + testee.read(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE); SoftAssertions.assertSoftly(soflty -> { soflty.assertThat(metricFactory.countFor(BLOBSTORE_CACHED_MISS_COUNT_METRIC_NAME)) @@ -324,8 +324,8 @@ public class CachedBlobStoreTest implements BlobStoreContract { void readBlobStoreWithNoneDefaultBucketNameShouldRecordByBackendLatency() { BlobId blobId = Mono.from(testee.save(TEST_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block(); - testee.read(TEST_BUCKETNAME, blobId); - testee.read(TEST_BUCKETNAME, blobId); + testee.read(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE); + testee.read(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE); SoftAssertions.assertSoftly(soflty -> soflty.assertThat(metricFactory.executionTimesFor(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME)) @@ -337,9 +337,8 @@ public class CachedBlobStoreTest implements BlobStoreContract { void readBytesWithNoneDefaultBucketNameShouldNotImpact() { BlobId blobId = Mono.from(testee.save(TEST_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block(); - Mono.from(testee.readBytes(TEST_BUCKETNAME, blobId)).block(); - Mono.from(testee.readBytes(TEST_BUCKETNAME, blobId)).block(); - + Mono.from(testee.readBytes(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block(); + Mono.from(testee.readBytes(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block(); SoftAssertions.assertSoftly(soflty -> { assertThat(metricFactory.countFor(BLOBSTORE_CACHED_MISS_COUNT_METRIC_NAME)) @@ -361,8 +360,8 @@ public class CachedBlobStoreTest implements BlobStoreContract { void readBytesWithNoneDefaultBucketNameShouldPublishBackendTimerMetrics() { BlobId blobId = Mono.from(testee.save(TEST_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block(); - Mono.from(testee.readBytes(TEST_BUCKETNAME, blobId)).block(); - Mono.from(testee.readBytes(TEST_BUCKETNAME, blobId)).block(); + Mono.from(testee.readBytes(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block(); + Mono.from(testee.readBytes(TEST_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block(); SoftAssertions.assertSoftly(soflty -> soflty.assertThat(metricFactory.executionTimesFor(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME)) @@ -374,8 +373,8 @@ public class CachedBlobStoreTest implements BlobStoreContract { void readBlobStoreCacheShouldPublishTimerMetrics() { BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block(); - testee.read(DEFAULT_BUCKETNAME, blobId); - testee.read(DEFAULT_BUCKETNAME, blobId); + testee.read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE); + testee.read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE); SoftAssertions.assertSoftly(soflty -> { soflty.assertThat(metricFactory.executionTimesFor(BLOBSTORE_CACHED_LATENCY_METRIC_NAME)) @@ -388,8 +387,8 @@ public class CachedBlobStoreTest implements BlobStoreContract { void readBytesCacheShouldPublishTimerMetrics() { BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block(); - Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block(); - Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block(); + Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block(); + Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block(); SoftAssertions.assertSoftly(soflty -> { soflty.assertThat(metricFactory.executionTimesFor(BLOBSTORE_CACHED_LATENCY_METRIC_NAME)) @@ -405,8 +404,8 @@ public class CachedBlobStoreTest implements BlobStoreContract { void readBytesShouldPublishBackendTimerMetricsForBigBlobs() { BlobId blobId = Mono.from(backend.save(DEFAULT_BUCKETNAME, ELEVEN_KILOBYTES, SIZE_BASED)).block(); - Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block(); - Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block(); + Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block(); + Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block(); SoftAssertions.assertSoftly(soflty -> soflty.assertThat(metricFactory.executionTimesFor(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME)) @@ -418,8 +417,8 @@ public class CachedBlobStoreTest implements BlobStoreContract { void readInputStreamShouldPublishBackendTimerForBigBlobs() { BlobId blobId = Mono.from(backend.save(DEFAULT_BUCKETNAME, ELEVEN_KILOBYTES, SIZE_BASED)).block(); - testee.read(DEFAULT_BUCKETNAME, blobId); - testee.read(DEFAULT_BUCKETNAME, blobId); + testee.read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE); + testee.read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE); SoftAssertions.assertSoftly(soflty -> soflty.assertThat(metricFactory.executionTimesFor(BLOBSTORE_BACKEND_LATENCY_METRIC_NAME)) @@ -431,8 +430,8 @@ public class CachedBlobStoreTest implements BlobStoreContract { void readBytesShouldNotIncreaseCacheCounterForBigBlobs() { BlobId blobId = Mono.from(backend.save(DEFAULT_BUCKETNAME, ELEVEN_KILOBYTES, SIZE_BASED)).block(); - Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block(); - Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block(); + Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block(); + Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block(); SoftAssertions.assertSoftly(soflty -> { soflty.assertThat(metricFactory.countFor(BLOBSTORE_CACHED_MISS_COUNT_METRIC_NAME)) @@ -466,7 +465,7 @@ public class CachedBlobStoreTest implements BlobStoreContract { BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, ELEVEN_KILOBYTES, SIZE_BASED)).block(); Duration delay = Duration.ofMillis(500); - Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)) + Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)) .then(Mono.delay(delay)) .repeat(2) .blockLast(); @@ -481,7 +480,7 @@ public class CachedBlobStoreTest implements BlobStoreContract { BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block(); Duration delay = Duration.ofMillis(500); - Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)) + Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)) .then(Mono.delay(delay)) .repeat(2) .blockLast(); @@ -495,8 +494,8 @@ public class CachedBlobStoreTest implements BlobStoreContract { void readBlobStoreCacheShouldCountWhenHit() { BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block(); - testee.read(DEFAULT_BUCKETNAME, blobId); - testee.read(DEFAULT_BUCKETNAME, blobId); + testee.read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE); + testee.read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE); assertThat(metricFactory.countFor(BLOBSTORE_CACHED_HIT_COUNT_METRIC_NAME)).isEqualTo(2); } @@ -505,8 +504,8 @@ public class CachedBlobStoreTest implements BlobStoreContract { void readBytesCacheShouldCountWhenHit() { BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block(); - Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block(); - Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block(); + Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block(); + Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block(); assertThat(metricFactory.countFor(BLOBSTORE_CACHED_HIT_COUNT_METRIC_NAME)).isEqualTo(2); } @@ -517,7 +516,7 @@ public class CachedBlobStoreTest implements BlobStoreContract { BlobId blobId = Mono.from(backend.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block(); Mono.from(cache.remove(blobId)).block(); - testee.read(DEFAULT_BUCKETNAME, blobId); + testee.read(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE); assertThat(metricFactory.countFor(BLOBSTORE_CACHED_MISS_COUNT_METRIC_NAME)).isEqualTo(1); } @@ -527,7 +526,7 @@ public class CachedBlobStoreTest implements BlobStoreContract { BlobId blobId = Mono.from(testee.save(DEFAULT_BUCKETNAME, APPROXIMATELY_FIVE_KILOBYTES, SIZE_BASED)).block(); Mono.from(cache.remove(blobId)).block(); - Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId)).block(); + Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, blobId, HIGH_PERFORMANCE)).block(); assertThat(metricFactory.countFor(BLOBSTORE_CACHED_MISS_COUNT_METRIC_NAME)).isEqualTo(1); } @@ -535,7 +534,7 @@ public class CachedBlobStoreTest implements BlobStoreContract { @Test void metricsShouldNotWorkExceptLatencyWhenReadNonExistingBlob() { SoftAssertions.assertSoftly(soflty -> { - soflty.assertThatThrownBy(() -> testee.read(DEFAULT_BUCKETNAME, new TestBlobId.Factory().randomId())) + soflty.assertThatThrownBy(() -> testee.read(DEFAULT_BUCKETNAME, new TestBlobId.Factory().randomId(), HIGH_PERFORMANCE)) .isInstanceOf(ObjectNotFoundException.class); soflty.assertThat(metricFactory.countFor(BLOBSTORE_CACHED_MISS_COUNT_METRIC_NAME)) @@ -556,7 +555,7 @@ public class CachedBlobStoreTest implements BlobStoreContract { @Test void metricsShouldNotWorkExceptLatencyWhenReadNonExistingBlobAsBytes() { SoftAssertions.assertSoftly(soflty -> { - soflty.assertThatThrownBy(() -> Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, new TestBlobId.Factory().randomId())).blockOptional()) + soflty.assertThatThrownBy(() -> Mono.from(testee.readBytes(DEFAULT_BUCKETNAME, new TestBlobId.Factory().randomId(), HIGH_PERFORMANCE)).blockOptional()) .isInstanceOf(ObjectNotFoundException.class); soflty.assertThat(metricFactory.countFor(BLOBSTORE_CACHED_MISS_COUNT_METRIC_NAME)) diff --git a/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java b/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java index 8deba51..9c20cba 100644 --- a/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java +++ b/server/blob/blob-common/src/main/java/org/apache/james/blob/api/Store.java @@ -97,7 +97,7 @@ public interface Store<T, I> { return Flux.fromIterable(blobIds.asMap().entrySet()) .publishOn(Schedulers.elastic()) .flatMapSequential( - entry -> Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), entry.getValue())) + entry -> Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), entry.getValue(), entry.getKey().getStoragePolicy())) .zipWith(Mono.just(entry.getKey()))) .map(entry -> Pair.of(entry.getT2(), entry.getT1())) .collectList() diff --git a/server/blob/blob-export-file/src/main/java/org/apache/james/blob/export/file/LocalFileBlobExportMechanism.java b/server/blob/blob-export-file/src/main/java/org/apache/james/blob/export/file/LocalFileBlobExportMechanism.java index a2968ea..e6aec2b 100644 --- a/server/blob/blob-export-file/src/main/java/org/apache/james/blob/export/file/LocalFileBlobExportMechanism.java +++ b/server/blob/blob-export-file/src/main/java/org/apache/james/blob/export/file/LocalFileBlobExportMechanism.java @@ -19,6 +19,8 @@ package org.apache.james.blob.export.file; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; + import java.io.File; import java.io.IOException; import java.net.UnknownHostException; @@ -121,7 +123,7 @@ public class LocalFileBlobExportMechanism implements BlobExportMechanism { String fileName = ExportedFileNamesGenerator.generateFileName(fileCustomPrefix, blobId, fileExtension); String fileURL = configuration.exportDirectory + "/" + fileName; File file = fileSystem.getFile(fileURL); - FileUtils.copyToFile(blobStore.read(blobStore.getDefaultBucketName(), blobId), file); + FileUtils.copyToFile(blobStore.read(blobStore.getDefaultBucketName(), blobId, LOW_COST), file); return file.getAbsolutePath(); } catch (IOException e) { diff --git a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessagePartsId.java b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessagePartsId.java index 936efe5..935a9b7 100644 --- a/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessagePartsId.java +++ b/server/blob/mail-store/src/main/java/org/apache/james/blob/mail/MimeMessagePartsId.java @@ -19,6 +19,9 @@ package org.apache.james.blob.mail; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.SIZE_BASED; + import java.util.Map; import java.util.Objects; @@ -75,8 +78,8 @@ public class MimeMessagePartsId implements BlobPartsId { } } - static final BlobType HEADER_BLOB_TYPE = new BlobType("mailHeader"); - static final BlobType BODY_BLOB_TYPE = new BlobType("mailBody"); + static final BlobType HEADER_BLOB_TYPE = new BlobType("mailHeader", SIZE_BASED); + static final BlobType BODY_BLOB_TYPE = new BlobType("mailBody", LOW_COST); private final BlobId headerBlobId; private final BlobId bodyBlobId; diff --git a/third-party/linshare/src/main/java/org/apache/james/linshare/LinshareBlobExportMechanism.java b/third-party/linshare/src/main/java/org/apache/james/linshare/LinshareBlobExportMechanism.java index 92c280a..7f39129 100644 --- a/third-party/linshare/src/main/java/org/apache/james/linshare/LinshareBlobExportMechanism.java +++ b/third-party/linshare/src/main/java/org/apache/james/linshare/LinshareBlobExportMechanism.java @@ -19,6 +19,8 @@ package org.apache.james.linshare; +import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; + import java.io.File; import java.io.IOException; import java.util.Optional; @@ -67,7 +69,7 @@ public class LinshareBlobExportMechanism implements BlobExportMechanism { String fileName = ExportedFileNamesGenerator.generateFileName(fileCustomPrefix, blobId, fileExtension); File tempFile = new File(tempDir, fileName); try { - FileUtils.copyInputStreamToFile(blobStore.read(blobStore.getDefaultBucketName(), blobId), tempFile); + FileUtils.copyInputStreamToFile(blobStore.read(blobStore.getDefaultBucketName(), blobId, LOW_COST), tempFile); uploadDocumentToTargetMail(mailAddress, tempFile); } finally { FileUtils.forceDelete(tempFile); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
