This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push: new 045a312072 JAMES-3925 deal with uploaded blobs directly with blobstoreDAO instea… (#2707) 045a312072 is described below commit 045a31207245d23c42a3c805520bf10a303cea33 Author: Rene Cordier <rcord...@linagora.com> AuthorDate: Tue Apr 29 16:20:48 2025 +0700 JAMES-3925 deal with uploaded blobs directly with blobstoreDAO instea… (#2707) --- .../upload/CassandraUploadRepository.java | 24 ++++++++++------- ...sandraJMAPCurrentUploadUsageCalculatorTest.java | 7 +++-- .../upload/CassandraUploadRepositoryTest.java | 15 ++++++++--- .../upload/CassandraUploadServiceTest.java | 8 +++--- .../postgres/upload/PostgresUploadRepository.java | 25 ++++++++++------- .../upload/PostgresUploadRepositoryTest.java | 14 ++++++---- .../postgres/upload/PostgresUploadServiceTest.java | 6 +---- .../memory/upload/InMemoryUploadRepository.java | 31 +++++++++++++--------- .../jmap/api/upload/UploadRepositoryContract.scala | 19 +++++++++++-- ...MemoryJMAPCurrentUploadUsageCalculatorTest.java | 6 +---- .../upload/InMemoryUploadRepositoryTest.java | 14 ++++++---- .../memory/upload/InMemoryUploadServiceTest.java | 6 +---- .../webadmin/data/jmap/JmapUploadRoutesTest.java | 10 ++++--- 13 files changed, 110 insertions(+), 75 deletions(-) diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java index 9de6f27c02..833577dcf3 100644 --- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java +++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java @@ -18,7 +18,6 @@ ****************************************************************/ package org.apache.james.jmap.cassandra.upload; -import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY; import java.io.InputStream; @@ -29,7 +28,8 @@ import java.time.temporal.ChronoUnit; import jakarta.inject.Inject; -import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.BucketName; import org.apache.james.core.Username; import org.apache.james.jmap.api.model.Upload; @@ -48,32 +48,36 @@ import reactor.core.publisher.Mono; public class CassandraUploadRepository implements UploadRepository { public static final BucketName UPLOAD_BUCKET = BucketName.of("jmap-uploads"); private final UploadDAO uploadDAO; - private final BlobStore blobStore; + private final BlobId.Factory blobIdFactory; + private final BlobStoreDAO blobStoreDAO; private final Clock clock; @Inject - public CassandraUploadRepository(UploadDAO uploadDAO, BlobStore blobStore, Clock clock) { + public CassandraUploadRepository(UploadDAO uploadDAO, BlobId.Factory blobIdFactory, BlobStoreDAO blobStoreDAO, Clock clock) { this.uploadDAO = uploadDAO; - this.blobStore = blobStore; + this.blobIdFactory = blobIdFactory; + this.blobStoreDAO = blobStoreDAO; this.clock = clock; } @Override public Mono<UploadMetaData> upload(InputStream data, ContentType contentType, Username user) { UploadId uploadId = generateId(); + BlobId blobId = blobIdFactory.of(uploadId.asString()); return Mono.fromCallable(() -> new CountingInputStream(data)) - .flatMap(countingInputStream -> Mono.from(blobStore.save(UPLOAD_BUCKET, countingInputStream, LOW_COST)) - .map(blobId -> new UploadDAO.UploadRepresentation(uploadId, blobId, contentType, countingInputStream.getCount(), user, + .flatMap(countingInputStream -> Mono.from(blobStoreDAO.save(UPLOAD_BUCKET, blobId, countingInputStream)) + .thenReturn(countingInputStream)) + .map(countingInputStream -> new UploadDAO.UploadRepresentation(uploadId, blobId, contentType, countingInputStream.getCount(), user, clock.instant().truncatedTo(ChronoUnit.MILLIS))) .flatMap(upload -> uploadDAO.save(upload) - .thenReturn(upload.toUploadMetaData()))); + .thenReturn(upload.toUploadMetaData())); } @Override public Mono<Upload> retrieve(UploadId id, Username user) { return uploadDAO.retrieve(user, id) - .flatMap(upload -> Mono.from(blobStore.readReactive(UPLOAD_BUCKET, upload.getBlobId(), LOW_COST)) + .flatMap(upload -> Mono.from(blobStoreDAO.readReactive(UPLOAD_BUCKET, upload.getBlobId())) .map(inputStream -> Upload.from(upload.toUploadMetaData(), () -> inputStream))) .switchIfEmpty(Mono.error(() -> new UploadNotFoundException(id))); } @@ -94,7 +98,7 @@ public class CassandraUploadRepository implements UploadRepository { Instant expirationTime = clock.instant().minus(expireDuration); return Flux.from(uploadDAO.all()) .filter(upload -> upload.getUploadDate().isBefore(expirationTime)) - .flatMap(upload -> Mono.from(blobStore.delete(UPLOAD_BUCKET, upload.getBlobId())) + .flatMap(upload -> Mono.from(blobStoreDAO.delete(UPLOAD_BUCKET, upload.getBlobId())) .then(uploadDAO.delete(upload.getUser(), upload.getId())), DEFAULT_CONCURRENCY) .then(); } diff --git a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraJMAPCurrentUploadUsageCalculatorTest.java b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraJMAPCurrentUploadUsageCalculatorTest.java index 3b4d41b154..f4ebca24f9 100644 --- a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraJMAPCurrentUploadUsageCalculatorTest.java +++ b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraJMAPCurrentUploadUsageCalculatorTest.java @@ -25,14 +25,13 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.backends.cassandra.components.CassandraDataDefinition; import org.apache.james.backends.cassandra.components.CassandraMutualizedQuotaDataDefinition; import org.apache.james.backends.cassandra.components.CassandraQuotaCurrentValueDao; -import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.PlainBlobId; import org.apache.james.blob.memory.MemoryBlobStoreDAO; import org.apache.james.jmap.api.upload.JMAPCurrentUploadUsageCalculator; import org.apache.james.jmap.api.upload.JMAPCurrentUploadUsageCalculatorContract; import org.apache.james.jmap.api.upload.UploadRepository; import org.apache.james.jmap.api.upload.UploadUsageRepository; -import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; @@ -48,9 +47,9 @@ public class CassandraJMAPCurrentUploadUsageCalculatorTest implements JMAPCurren @BeforeEach private void setup() { Clock clock = Clock.systemUTC(); + BlobId.Factory blobIdFactory = new PlainBlobId.Factory(); uploadRepository = new CassandraUploadRepository(new UploadDAO(cassandraCluster.getCassandraCluster().getConf(), - new PlainBlobId.Factory()), new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.of("default"), new PlainBlobId.Factory()), - clock); + blobIdFactory), blobIdFactory, new MemoryBlobStoreDAO(), clock); uploadUsageRepository = new CassandraUploadUsageRepository(new CassandraQuotaCurrentValueDao(cassandraCluster.getCassandraCluster().getConf())); jmapCurrentUploadUsageCalculator = new JMAPCurrentUploadUsageCalculator(uploadRepository, uploadUsageRepository); } diff --git a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java index 14f1846576..98c90d12f5 100644 --- a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java +++ b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java @@ -22,13 +22,13 @@ package org.apache.james.jmap.cassandra.upload; import java.time.Clock; import org.apache.james.backends.cassandra.CassandraClusterExtension; -import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.PlainBlobId; import org.apache.james.blob.memory.MemoryBlobStoreDAO; import org.apache.james.jmap.api.model.UploadId; import org.apache.james.jmap.api.upload.UploadRepository; import org.apache.james.jmap.api.upload.UploadRepositoryContract; -import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore; import org.apache.james.utils.UpdatableTickingClock; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -39,15 +39,17 @@ import com.datastax.oss.driver.api.core.uuid.Uuids; class CassandraUploadRepositoryTest implements UploadRepositoryContract { @RegisterExtension static CassandraClusterExtension cassandra = new CassandraClusterExtension(UploadDataDefinition.MODULE); + private BlobStoreDAO blobStoreDAO; private CassandraUploadRepository testee; private UpdatableTickingClock clock; @BeforeEach void setUp() { clock = new UpdatableTickingClock(Clock.systemUTC().instant()); + BlobId.Factory blobIdFactory = new PlainBlobId.Factory(); + blobStoreDAO = new MemoryBlobStoreDAO(); testee = new CassandraUploadRepository(new UploadDAO(cassandra.getCassandraCluster().getConf(), - new PlainBlobId.Factory()), new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.of("default"), new PlainBlobId.Factory()), - clock); + blobIdFactory), blobIdFactory, blobStoreDAO, clock); } @Override @@ -77,4 +79,9 @@ class CassandraUploadRepositoryTest implements UploadRepositoryContract { public UpdatableTickingClock clock() { return clock; } + + @Override + public BlobStoreDAO blobStoreDAO() { + return blobStoreDAO; + } } \ No newline at end of file diff --git a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadServiceTest.java b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadServiceTest.java index 1fad61d0bf..f5014b0612 100644 --- a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadServiceTest.java +++ b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadServiceTest.java @@ -26,7 +26,7 @@ import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.backends.cassandra.components.CassandraDataDefinition; import org.apache.james.backends.cassandra.components.CassandraMutualizedQuotaDataDefinition; import org.apache.james.backends.cassandra.components.CassandraQuotaCurrentValueDao; -import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.PlainBlobId; import org.apache.james.blob.memory.MemoryBlobStoreDAO; import org.apache.james.jmap.api.upload.UploadRepository; @@ -35,7 +35,6 @@ import org.apache.james.jmap.api.upload.UploadServiceContract; import org.apache.james.jmap.api.upload.UploadServiceDefaultImpl; import org.apache.james.jmap.api.upload.UploadUsageRepository; import org.apache.james.mailbox.cassandra.modules.CassandraMailboxQuotaDataDefinition; -import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; @@ -51,8 +50,9 @@ class CassandraUploadServiceTest implements UploadServiceContract { @BeforeEach void setUp(CassandraCluster cassandraCluster) { Clock clock = Clock.systemUTC(); - uploadRepository = new CassandraUploadRepository(new UploadDAO(cassandraCluster.getConf(), new PlainBlobId.Factory()), new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), - BucketName.of("default"), new PlainBlobId.Factory()), clock); + BlobId.Factory blobIdFactory = new PlainBlobId.Factory(); + uploadRepository = new CassandraUploadRepository(new UploadDAO(cassandraCluster.getConf(), + blobIdFactory), blobIdFactory, new MemoryBlobStoreDAO(), clock); uploadUsageRepository = new CassandraUploadUsageRepository(new CassandraQuotaCurrentValueDao(cassandraCluster.getConf())); testee = new UploadServiceDefaultImpl(uploadRepository, uploadUsageRepository, UploadServiceContract.TEST_CONFIGURATION()); } diff --git a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepository.java b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepository.java index 35d2c7b86c..737f1f8efb 100644 --- a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepository.java +++ b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepository.java @@ -20,7 +20,6 @@ package org.apache.james.jmap.postgres.upload; import static org.apache.james.backends.postgres.PostgresCommons.INSTANT_TO_LOCAL_DATE_TIME; -import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST; import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY; import java.io.InputStream; @@ -31,7 +30,8 @@ import java.time.LocalDateTime; import jakarta.inject.Inject; import jakarta.inject.Singleton; -import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.BucketName; import org.apache.james.core.Username; import org.apache.james.jmap.api.model.Upload; @@ -49,17 +49,19 @@ import reactor.core.publisher.Mono; public class PostgresUploadRepository implements UploadRepository { public static final BucketName UPLOAD_BUCKET = BucketName.of("jmap-uploads"); - private final BlobStore blobStore; + private final BlobId.Factory blobIdFactory; + private final BlobStoreDAO blobStoreDAO; private final Clock clock; private final PostgresUploadDAO.Factory uploadDAOFactory; private final PostgresUploadDAO byPassRLSUploadDAO; @Inject @Singleton - public PostgresUploadRepository(BlobStore blobStore, Clock clock, + public PostgresUploadRepository(BlobId.Factory blobIdFactory, BlobStoreDAO blobStoreDAO, Clock clock, PostgresUploadDAO.Factory uploadDAOFactory, PostgresUploadDAO byPassRLSUploadDAO) { - this.blobStore = blobStore; + this.blobIdFactory = blobIdFactory; + this.blobStoreDAO = blobStoreDAO; this.clock = clock; this.uploadDAOFactory = uploadDAOFactory; this.byPassRLSUploadDAO = byPassRLSUploadDAO; @@ -68,17 +70,20 @@ public class PostgresUploadRepository implements UploadRepository { @Override public Mono<UploadMetaData> upload(InputStream data, ContentType contentType, Username user) { UploadId uploadId = generateId(); + BlobId blobId = blobIdFactory.of(uploadId.asString()); PostgresUploadDAO uploadDAO = uploadDAOFactory.create(user.getDomainPart()); + return Mono.fromCallable(() -> new CountingInputStream(data)) - .flatMap(countingInputStream -> Mono.from(blobStore.save(UPLOAD_BUCKET, countingInputStream, LOW_COST)) - .map(blobId -> UploadMetaData.from(uploadId, contentType, countingInputStream.getCount(), blobId, clock.instant())) - .flatMap(uploadMetaData -> uploadDAO.insert(uploadMetaData, user))); + .flatMap(countingInputStream -> Mono.from(blobStoreDAO.save(UPLOAD_BUCKET, blobId, countingInputStream)) + .thenReturn(countingInputStream)) + .map(countingInputStream -> UploadMetaData.from(uploadId, contentType, countingInputStream.getCount(), blobId, clock.instant())) + .flatMap(uploadMetaData -> uploadDAO.insert(uploadMetaData, user)); } @Override public Mono<Upload> retrieve(UploadId id, Username user) { return uploadDAOFactory.create(user.getDomainPart()).get(id, user) - .flatMap(upload -> Mono.from(blobStore.readReactive(UPLOAD_BUCKET, upload.blobId(), LOW_COST)) + .flatMap(upload -> Mono.from(blobStoreDAO.readReactive(UPLOAD_BUCKET, upload.blobId())) .map(inputStream -> Upload.from(upload, () -> inputStream))) .switchIfEmpty(Mono.error(() -> new UploadNotFoundException(id))); } @@ -101,7 +106,7 @@ public class PostgresUploadRepository implements UploadRepository { .flatMap(uploadPair -> { Username username = uploadPair.getRight(); UploadMetaData upload = uploadPair.getLeft(); - return Mono.from(blobStore.delete(UPLOAD_BUCKET, upload.blobId())) + return Mono.from(blobStoreDAO.delete(UPLOAD_BUCKET, upload.blobId())) .then(byPassRLSUploadDAO.delete(upload.uploadId(), username)); }, DEFAULT_CONCURRENCY) .then(); diff --git a/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepositoryTest.java b/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepositoryTest.java index d7b89094b7..80f2f48ec2 100644 --- a/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepositoryTest.java +++ b/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepositoryTest.java @@ -24,13 +24,11 @@ import java.time.Clock; import org.apache.james.backends.postgres.PostgresDataDefinition; import org.apache.james.backends.postgres.PostgresExtension; import org.apache.james.blob.api.BlobId; -import org.apache.james.blob.api.BlobStore; -import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.PlainBlobId; import org.apache.james.blob.memory.MemoryBlobStoreDAO; import org.apache.james.jmap.api.upload.UploadRepository; import org.apache.james.jmap.api.upload.UploadRepositoryContract; -import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore; import org.apache.james.utils.UpdatableTickingClock; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; @@ -40,6 +38,7 @@ class PostgresUploadRepositoryTest implements UploadRepositoryContract { @RegisterExtension static PostgresExtension postgresExtension = PostgresExtension.withoutRowLevelSecurity( PostgresDataDefinition.aggregateModules(PostgresUploadDataDefinition.MODULE)); + private BlobStoreDAO blobStoreDAO; private UploadRepository testee; private UpdatableTickingClock clock; @@ -47,10 +46,10 @@ class PostgresUploadRepositoryTest implements UploadRepositoryContract { void setUp() { clock = new UpdatableTickingClock(Clock.systemUTC().instant()); BlobId.Factory blobIdFactory = new PlainBlobId.Factory(); - BlobStore blobStore = new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, blobIdFactory); PostgresUploadDAO uploadDAO = new PostgresUploadDAO(postgresExtension.getDefaultPostgresExecutor(), blobIdFactory); PostgresUploadDAO.Factory uploadFactory = new PostgresUploadDAO.Factory(blobIdFactory, postgresExtension.getExecutorFactory()); - testee = new PostgresUploadRepository(blobStore, clock, uploadFactory, uploadDAO); + blobStoreDAO = new MemoryBlobStoreDAO(); + testee = new PostgresUploadRepository(blobIdFactory, blobStoreDAO, clock, uploadFactory, uploadDAO); } @Override @@ -62,4 +61,9 @@ class PostgresUploadRepositoryTest implements UploadRepositoryContract { public UpdatableTickingClock clock() { return clock; } + + @Override + public BlobStoreDAO blobStoreDAO() { + return blobStoreDAO; + } } diff --git a/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadServiceTest.java b/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadServiceTest.java index 2c220e5769..81bf880be6 100644 --- a/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadServiceTest.java +++ b/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadServiceTest.java @@ -26,8 +26,6 @@ import org.apache.james.backends.postgres.PostgresExtension; import org.apache.james.backends.postgres.quota.PostgresQuotaCurrentValueDAO; import org.apache.james.backends.postgres.quota.PostgresQuotaDataDefinition; import org.apache.james.blob.api.BlobId; -import org.apache.james.blob.api.BlobStore; -import org.apache.james.blob.api.BucketName; import org.apache.james.blob.api.PlainBlobId; import org.apache.james.blob.memory.MemoryBlobStoreDAO; import org.apache.james.jmap.api.upload.UploadRepository; @@ -35,7 +33,6 @@ import org.apache.james.jmap.api.upload.UploadService; import org.apache.james.jmap.api.upload.UploadServiceContract; import org.apache.james.jmap.api.upload.UploadServiceDefaultImpl; import org.apache.james.jmap.api.upload.UploadUsageRepository; -import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; @@ -52,10 +49,9 @@ public class PostgresUploadServiceTest implements UploadServiceContract { @BeforeEach void setUp() { BlobId.Factory blobIdFactory = new PlainBlobId.Factory(); - BlobStore blobStore = new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, blobIdFactory); PostgresUploadDAO uploadDAO = new PostgresUploadDAO(postgresExtension.getDefaultPostgresExecutor(), blobIdFactory); PostgresUploadDAO.Factory uploadFactory = new PostgresUploadDAO.Factory(blobIdFactory, postgresExtension.getExecutorFactory()); - uploadRepository = new PostgresUploadRepository(blobStore, Clock.systemUTC(), uploadFactory, uploadDAO); + uploadRepository = new PostgresUploadRepository(blobIdFactory, new MemoryBlobStoreDAO(), Clock.systemUTC(), uploadFactory, uploadDAO); uploadUsageRepository = new PostgresUploadUsageRepository(new PostgresQuotaCurrentValueDAO(postgresExtension.getDefaultPostgresExecutor())); testee = new UploadServiceDefaultImpl(uploadRepository, uploadUsageRepository, UploadServiceContract.TEST_CONFIGURATION()); } diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepository.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepository.java index 9823214f98..c76b70c482 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepository.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepository.java @@ -31,7 +31,8 @@ import java.util.Map; import jakarta.inject.Inject; import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.BucketName; import org.apache.james.core.Username; import org.apache.james.jmap.api.model.Upload; @@ -49,18 +50,20 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public class InMemoryUploadRepository implements UploadRepository { + private static final BucketName UPLOAD_BUCKET = BucketName.of("jmap-uploads"); private final Map<UploadId, ImmutablePair<Username, UploadMetaData>> uploadStore; - - private final BlobStore blobStore; + private final BlobId.Factory blobIdFactory; + private final BlobStoreDAO blobStoreDAO; private final BucketName bucketName; private final Clock clock; @Inject - public InMemoryUploadRepository(BlobStore blobStore, Clock clock) { - this.blobStore = blobStore; - this.bucketName = blobStore.getDefaultBucketName(); + public InMemoryUploadRepository(BlobId.Factory blobIdFactory, BlobStoreDAO blobStoreDAO, Clock clock) { + this.blobIdFactory = blobIdFactory; + this.blobStoreDAO = blobStoreDAO; + this.bucketName = UPLOAD_BUCKET; this.clock = clock; this.uploadStore = new HashMap<>(); } @@ -71,15 +74,17 @@ public class InMemoryUploadRepository implements UploadRepository { Preconditions.checkNotNull(contentType); Preconditions.checkNotNull(user); + UploadId uploadId = UploadId.random(); + BlobId blobId = blobIdFactory.of(uploadId.asString()); + return Mono.fromCallable(() -> new CountingInputStream(data)) - .flatMap(dataAsByte -> Mono.from(blobStore.save(bucketName, dataAsByte, BlobStore.StoragePolicy.LOW_COST)) - .map(blobId -> { - UploadId uploadId = UploadId.random(); + .flatMap(dataAsByte -> Mono.from(blobStoreDAO.save(bucketName, blobId, dataAsByte)) + .thenReturn(dataAsByte)) + .map(dataAsByte -> { Instant uploadDate = clock.instant(); uploadStore.put(uploadId, new ImmutablePair<>(user, UploadMetaData.from(uploadId, contentType, dataAsByte.getCount(), blobId, uploadDate))); return UploadMetaData.from(uploadId, contentType, dataAsByte.getCount(), blobId, uploadDate); - }) - ); + }); } @Override @@ -116,13 +121,13 @@ public class InMemoryUploadRepository implements UploadRepository { Instant expirationTime = clock.instant().minus(expireDuration); return Flux.fromIterable(List.copyOf(uploadStore.values())) .filter(pair -> pair.right.uploadDate().isBefore(expirationTime)) - .flatMap(pair -> Mono.from(blobStore.delete(bucketName, pair.right.blobId())) + .flatMap(pair -> Mono.from(blobStoreDAO.delete(bucketName, pair.right.blobId())) .then(Mono.fromRunnable(() -> uploadStore.remove(pair.right.uploadId())))) .then(); } private Mono<Upload> retrieveUpload(UploadMetaData uploadMetaData) { - return Mono.from(blobStore.readBytes(bucketName, uploadMetaData.blobId())) + return Mono.from(blobStoreDAO.readBytes(bucketName, uploadMetaData.blobId())) .map(content -> Upload.from(uploadMetaData, () -> new ByteArrayInputStream(content))); } } diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala index f544408968..a4759dade5 100644 --- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala +++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala @@ -21,14 +21,15 @@ import java.io.InputStream import java.nio.charset.StandardCharsets - import java.time.{Clock, Duration} + import java.time.Duration import java.util.UUID import org.apache.commons.io.IOUtils + import org.apache.james.blob.api.{BlobId, BlobStoreDAO, BucketName, ObjectNotFoundException} import org.apache.james.core.Username import org.apache.james.jmap.api.model.Size.sanitizeSize import org.apache.james.jmap.api.model.{Upload, UploadId, UploadMetaData, UploadNotFoundException} - import org.apache.james.jmap.api.upload.UploadRepositoryContract.{CONTENT_TYPE, DATA_STRING, USER} + import org.apache.james.jmap.api.upload.UploadRepositoryContract.{CONTENT_TYPE, DATA_STRING, UPLOAD_BUCKET, USER} import org.apache.james.mailbox.model.ContentType import org.apache.james.utils.UpdatableTickingClock import org.assertj.core.api.Assertions.{assertThat, assertThatCode, assertThatThrownBy} @@ -43,6 +44,7 @@ .of("text/html") private lazy val DATA_STRING: String = "123321" private lazy val USER: Username = Username.of("Bob") + private lazy val UPLOAD_BUCKET: BucketName = BucketName.of("jmap-uploads") } trait UploadRepositoryContract { @@ -53,6 +55,8 @@ def clock: UpdatableTickingClock + def blobStoreDAO: BlobStoreDAO + def data(): InputStream = IOUtils.toInputStream(DATA_STRING, StandardCharsets.UTF_8) @Test @@ -218,4 +222,15 @@ assertThat(SMono.fromPublisher(testee.retrieve(uploadId2, USER)).block()) .isNotNull } + + @Test + def deleteByUploadDateBeforeShouldRemoveExpiredUploadsFromBlobstore(): Unit = { + val blobId: BlobId = SMono.fromPublisher(testee.upload(data(), CONTENT_TYPE, USER)).block().blobId + clock.setInstant(clock.instant().plus(8, java.time.temporal.ChronoUnit.DAYS)) + + SMono(testee.deleteByUploadDateBefore(Duration.ofDays(7))).block(); + + assertThatThrownBy(() => blobStoreDAO.read(UPLOAD_BUCKET, blobId)) + .isInstanceOf(classOf[ObjectNotFoundException]) + } } diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryJMAPCurrentUploadUsageCalculatorTest.java b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryJMAPCurrentUploadUsageCalculatorTest.java index bddc9d808f..b0d7368980 100644 --- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryJMAPCurrentUploadUsageCalculatorTest.java +++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryJMAPCurrentUploadUsageCalculatorTest.java @@ -21,15 +21,12 @@ package org.apache.james.jmap.memory.upload; import java.time.Clock; -import org.apache.james.blob.api.BlobStore; -import org.apache.james.blob.api.BucketName; import org.apache.james.blob.api.PlainBlobId; import org.apache.james.blob.memory.MemoryBlobStoreDAO; import org.apache.james.jmap.api.upload.JMAPCurrentUploadUsageCalculator; import org.apache.james.jmap.api.upload.JMAPCurrentUploadUsageCalculatorContract; import org.apache.james.jmap.api.upload.UploadRepository; import org.apache.james.jmap.api.upload.UploadUsageRepository; -import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore; import org.junit.jupiter.api.BeforeEach; public class InMemoryJMAPCurrentUploadUsageCalculatorTest implements JMAPCurrentUploadUsageCalculatorContract { @@ -40,8 +37,7 @@ public class InMemoryJMAPCurrentUploadUsageCalculatorTest implements JMAPCurrent @BeforeEach private void setup() { - BlobStore blobStore = new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, new PlainBlobId.Factory()); - uploadRepository = new InMemoryUploadRepository(blobStore, Clock.systemUTC()); + uploadRepository = new InMemoryUploadRepository(new PlainBlobId.Factory(), new MemoryBlobStoreDAO(), Clock.systemUTC()); uploadUsageRepository = new InMemoryUploadUsageRepository(); jmapCurrentUploadUsageCalculator = new JMAPCurrentUploadUsageCalculator(uploadRepository, uploadUsageRepository); } diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepositoryTest.java b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepositoryTest.java index d1f37e01ce..aa4b62effa 100644 --- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepositoryTest.java +++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepositoryTest.java @@ -21,26 +21,25 @@ package org.apache.james.jmap.memory.upload; import java.time.Clock; -import org.apache.james.blob.api.BlobStore; -import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.PlainBlobId; import org.apache.james.blob.memory.MemoryBlobStoreDAO; import org.apache.james.jmap.api.upload.UploadRepository; import org.apache.james.jmap.api.upload.UploadRepositoryContract; -import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore; import org.apache.james.utils.UpdatableTickingClock; import org.junit.jupiter.api.BeforeEach; public class InMemoryUploadRepositoryTest implements UploadRepositoryContract { + private BlobStoreDAO blobStoreDAO; private UploadRepository testee; private UpdatableTickingClock clock; @BeforeEach void setUp() { - BlobStore blobStore = new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, new PlainBlobId.Factory()); clock = new UpdatableTickingClock(Clock.systemUTC().instant()); - testee = new InMemoryUploadRepository(blobStore, clock); + blobStoreDAO = new MemoryBlobStoreDAO(); + testee = new InMemoryUploadRepository(new PlainBlobId.Factory(), blobStoreDAO, clock); } @Override @@ -52,4 +51,9 @@ public class InMemoryUploadRepositoryTest implements UploadRepositoryContract { public UpdatableTickingClock clock() { return clock; } + + @Override + public BlobStoreDAO blobStoreDAO() { + return blobStoreDAO; + } } diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryUploadServiceTest.java b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryUploadServiceTest.java index 628761c71d..15c7c160ac 100644 --- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryUploadServiceTest.java +++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryUploadServiceTest.java @@ -21,8 +21,6 @@ package org.apache.james.jmap.memory.upload; import java.time.Clock; -import org.apache.james.blob.api.BlobStore; -import org.apache.james.blob.api.BucketName; import org.apache.james.blob.api.PlainBlobId; import org.apache.james.blob.memory.MemoryBlobStoreDAO; import org.apache.james.jmap.api.upload.UploadRepository; @@ -30,7 +28,6 @@ import org.apache.james.jmap.api.upload.UploadService; import org.apache.james.jmap.api.upload.UploadServiceContract; import org.apache.james.jmap.api.upload.UploadServiceDefaultImpl; import org.apache.james.jmap.api.upload.UploadUsageRepository; -import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore; import org.junit.jupiter.api.BeforeEach; public class InMemoryUploadServiceTest implements UploadServiceContract { @@ -41,8 +38,7 @@ public class InMemoryUploadServiceTest implements UploadServiceContract { @BeforeEach void setUp() { - BlobStore blobStore = new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), BucketName.DEFAULT, new PlainBlobId.Factory()); - uploadRepository = new InMemoryUploadRepository(blobStore, Clock.systemUTC()); + uploadRepository = new InMemoryUploadRepository(new PlainBlobId.Factory(), new MemoryBlobStoreDAO(), Clock.systemUTC()); uploadUsageRepository = new InMemoryUploadUsageRepository(); testee = new UploadServiceDefaultImpl(uploadRepository, uploadUsageRepository, UploadServiceContract.TEST_CONFIGURATION()); } diff --git a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/JmapUploadRoutesTest.java b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/JmapUploadRoutesTest.java index bcf58aa651..3b1b8c8ba0 100644 --- a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/JmapUploadRoutesTest.java +++ b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/JmapUploadRoutesTest.java @@ -36,7 +36,9 @@ import java.util.Map; import org.apache.commons.io.IOUtils; import org.apache.james.backends.cassandra.CassandraClusterExtension; import org.apache.james.backends.cassandra.components.CassandraDataDefinition; +import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BlobStoreDAO; import org.apache.james.blob.api.BucketName; import org.apache.james.blob.api.PlainBlobId; import org.apache.james.blob.memory.MemoryBlobStoreDAO; @@ -97,12 +99,14 @@ class JmapUploadRoutesTest { void setUp() { taskManager = new MemoryTaskManager(new Hostname("foo")); clock = new UpdatableTickingClock(TIMESTAMP.toInstant()); - blobStore = new PassThroughBlobStore(new MemoryBlobStoreDAO(), + BlobStoreDAO blobStoreDAO = new MemoryBlobStoreDAO(); + BlobId.Factory blobIdFactory = new PlainBlobId.Factory(); + blobStore = new PassThroughBlobStore(blobStoreDAO, BucketName.of("default"), - new PlainBlobId.Factory()); + blobIdFactory); cassandraUploadRepository = new CassandraUploadRepository(new UploadDAO(cassandraCluster.getCassandraCluster().getConf(), - new PlainBlobId.Factory()), blobStore, clock); + blobIdFactory), blobIdFactory, blobStoreDAO, clock); JsonTransformer jsonTransformer = new JsonTransformer(); TasksRoutes tasksRoutes = new TasksRoutes(taskManager, jsonTransformer, DTOConverter.of(UploadCleanupTaskAdditionalInformationDTO.SERIALIZATION_MODULE)); --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org