This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 045a312072 JAMES-3925 deal with uploaded blobs directly with 
blobstoreDAO instea… (#2707)
045a312072 is described below

commit 045a31207245d23c42a3c805520bf10a303cea33
Author: Rene Cordier <rcord...@linagora.com>
AuthorDate: Tue Apr 29 16:20:48 2025 +0700

    JAMES-3925 deal with uploaded blobs directly with blobstoreDAO instea… 
(#2707)
---
 .../upload/CassandraUploadRepository.java          | 24 ++++++++++-------
 ...sandraJMAPCurrentUploadUsageCalculatorTest.java |  7 +++--
 .../upload/CassandraUploadRepositoryTest.java      | 15 ++++++++---
 .../upload/CassandraUploadServiceTest.java         |  8 +++---
 .../postgres/upload/PostgresUploadRepository.java  | 25 ++++++++++-------
 .../upload/PostgresUploadRepositoryTest.java       | 14 ++++++----
 .../postgres/upload/PostgresUploadServiceTest.java |  6 +----
 .../memory/upload/InMemoryUploadRepository.java    | 31 +++++++++++++---------
 .../jmap/api/upload/UploadRepositoryContract.scala | 19 +++++++++++--
 ...MemoryJMAPCurrentUploadUsageCalculatorTest.java |  6 +----
 .../upload/InMemoryUploadRepositoryTest.java       | 14 ++++++----
 .../memory/upload/InMemoryUploadServiceTest.java   |  6 +----
 .../webadmin/data/jmap/JmapUploadRoutesTest.java   | 10 ++++---
 13 files changed, 110 insertions(+), 75 deletions(-)

diff --git 
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java
 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java
index 9de6f27c02..833577dcf3 100644
--- 
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java
+++ 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java
@@ -18,7 +18,6 @@
  ****************************************************************/
 package org.apache.james.jmap.cassandra.upload;
 
-import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
 import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.io.InputStream;
@@ -29,7 +28,8 @@ import java.time.temporal.ChronoUnit;
 
 import jakarta.inject.Inject;
 
-import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.core.Username;
 import org.apache.james.jmap.api.model.Upload;
@@ -48,32 +48,36 @@ import reactor.core.publisher.Mono;
 public class CassandraUploadRepository implements UploadRepository {
     public static final BucketName UPLOAD_BUCKET = 
BucketName.of("jmap-uploads");
     private final UploadDAO uploadDAO;
-    private final BlobStore blobStore;
+    private final BlobId.Factory blobIdFactory;
+    private final BlobStoreDAO blobStoreDAO;
     private final Clock clock;
 
     @Inject
-    public CassandraUploadRepository(UploadDAO uploadDAO, BlobStore blobStore, 
Clock clock) {
+    public CassandraUploadRepository(UploadDAO uploadDAO, BlobId.Factory 
blobIdFactory, BlobStoreDAO blobStoreDAO, Clock clock) {
         this.uploadDAO = uploadDAO;
-        this.blobStore = blobStore;
+        this.blobIdFactory = blobIdFactory;
+        this.blobStoreDAO = blobStoreDAO;
         this.clock = clock;
     }
 
     @Override
     public Mono<UploadMetaData> upload(InputStream data, ContentType 
contentType, Username user) {
         UploadId uploadId = generateId();
+        BlobId blobId = blobIdFactory.of(uploadId.asString());
 
         return Mono.fromCallable(() -> new CountingInputStream(data))
-            .flatMap(countingInputStream -> 
Mono.from(blobStore.save(UPLOAD_BUCKET, countingInputStream, LOW_COST))
-                .map(blobId -> new UploadDAO.UploadRepresentation(uploadId, 
blobId, contentType, countingInputStream.getCount(), user,
+            .flatMap(countingInputStream -> 
Mono.from(blobStoreDAO.save(UPLOAD_BUCKET, blobId, countingInputStream))
+                    .thenReturn(countingInputStream))
+                .map(countingInputStream -> new 
UploadDAO.UploadRepresentation(uploadId, blobId, contentType, 
countingInputStream.getCount(), user,
                     clock.instant().truncatedTo(ChronoUnit.MILLIS)))
                 .flatMap(upload -> uploadDAO.save(upload)
-                    .thenReturn(upload.toUploadMetaData())));
+                    .thenReturn(upload.toUploadMetaData()));
     }
 
     @Override
     public Mono<Upload> retrieve(UploadId id, Username user) {
         return uploadDAO.retrieve(user, id)
-            .flatMap(upload -> Mono.from(blobStore.readReactive(UPLOAD_BUCKET, 
upload.getBlobId(), LOW_COST))
+            .flatMap(upload -> 
Mono.from(blobStoreDAO.readReactive(UPLOAD_BUCKET, upload.getBlobId()))
                 .map(inputStream -> Upload.from(upload.toUploadMetaData(), () 
-> inputStream)))
             .switchIfEmpty(Mono.error(() -> new UploadNotFoundException(id)));
     }
@@ -94,7 +98,7 @@ public class CassandraUploadRepository implements 
UploadRepository {
         Instant expirationTime = clock.instant().minus(expireDuration);
         return Flux.from(uploadDAO.all())
             .filter(upload -> upload.getUploadDate().isBefore(expirationTime))
-            .flatMap(upload -> Mono.from(blobStore.delete(UPLOAD_BUCKET, 
upload.getBlobId()))
+            .flatMap(upload -> Mono.from(blobStoreDAO.delete(UPLOAD_BUCKET, 
upload.getBlobId()))
                 .then(uploadDAO.delete(upload.getUser(), upload.getId())), 
DEFAULT_CONCURRENCY)
             .then();
     }
diff --git 
a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraJMAPCurrentUploadUsageCalculatorTest.java
 
b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraJMAPCurrentUploadUsageCalculatorTest.java
index 3b4d41b154..f4ebca24f9 100644
--- 
a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraJMAPCurrentUploadUsageCalculatorTest.java
+++ 
b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraJMAPCurrentUploadUsageCalculatorTest.java
@@ -25,14 +25,13 @@ import 
org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraDataDefinition;
 import 
org.apache.james.backends.cassandra.components.CassandraMutualizedQuotaDataDefinition;
 import 
org.apache.james.backends.cassandra.components.CassandraQuotaCurrentValueDao;
-import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.PlainBlobId;
 import org.apache.james.blob.memory.MemoryBlobStoreDAO;
 import org.apache.james.jmap.api.upload.JMAPCurrentUploadUsageCalculator;
 import 
org.apache.james.jmap.api.upload.JMAPCurrentUploadUsageCalculatorContract;
 import org.apache.james.jmap.api.upload.UploadRepository;
 import org.apache.james.jmap.api.upload.UploadUsageRepository;
-import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -48,9 +47,9 @@ public class CassandraJMAPCurrentUploadUsageCalculatorTest 
implements JMAPCurren
     @BeforeEach
     private void setup() {
         Clock clock = Clock.systemUTC();
+        BlobId.Factory blobIdFactory = new PlainBlobId.Factory();
         uploadRepository = new CassandraUploadRepository(new 
UploadDAO(cassandraCluster.getCassandraCluster().getConf(),
-            new PlainBlobId.Factory()), new DeDuplicationBlobStore(new 
MemoryBlobStoreDAO(), BucketName.of("default"), new PlainBlobId.Factory()),
-            clock);
+            blobIdFactory), blobIdFactory, new MemoryBlobStoreDAO(), clock);
         uploadUsageRepository = new CassandraUploadUsageRepository(new 
CassandraQuotaCurrentValueDao(cassandraCluster.getCassandraCluster().getConf()));
         jmapCurrentUploadUsageCalculator = new 
JMAPCurrentUploadUsageCalculator(uploadRepository, uploadUsageRepository);
     }
diff --git 
a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java
 
b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java
index 14f1846576..98c90d12f5 100644
--- 
a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java
+++ 
b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java
@@ -22,13 +22,13 @@ package org.apache.james.jmap.cassandra.upload;
 import java.time.Clock;
 
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
-import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.PlainBlobId;
 import org.apache.james.blob.memory.MemoryBlobStoreDAO;
 import org.apache.james.jmap.api.model.UploadId;
 import org.apache.james.jmap.api.upload.UploadRepository;
 import org.apache.james.jmap.api.upload.UploadRepositoryContract;
-import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
 import org.apache.james.utils.UpdatableTickingClock;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
@@ -39,15 +39,17 @@ import com.datastax.oss.driver.api.core.uuid.Uuids;
 class CassandraUploadRepositoryTest implements UploadRepositoryContract {
     @RegisterExtension
     static CassandraClusterExtension cassandra = new 
CassandraClusterExtension(UploadDataDefinition.MODULE);
+    private BlobStoreDAO blobStoreDAO;
     private CassandraUploadRepository testee;
     private UpdatableTickingClock clock;
 
     @BeforeEach
     void setUp() {
         clock = new UpdatableTickingClock(Clock.systemUTC().instant());
+        BlobId.Factory blobIdFactory = new PlainBlobId.Factory();
+        blobStoreDAO = new MemoryBlobStoreDAO();
         testee = new CassandraUploadRepository(new 
UploadDAO(cassandra.getCassandraCluster().getConf(),
-            new PlainBlobId.Factory()), new DeDuplicationBlobStore(new 
MemoryBlobStoreDAO(), BucketName.of("default"), new PlainBlobId.Factory()),
-            clock);
+            blobIdFactory), blobIdFactory, blobStoreDAO, clock);
     }
 
     @Override
@@ -77,4 +79,9 @@ class CassandraUploadRepositoryTest implements 
UploadRepositoryContract {
     public UpdatableTickingClock clock() {
         return clock;
     }
+
+    @Override
+    public BlobStoreDAO blobStoreDAO() {
+        return blobStoreDAO;
+    }
 }
\ No newline at end of file
diff --git 
a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadServiceTest.java
 
b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadServiceTest.java
index 1fad61d0bf..f5014b0612 100644
--- 
a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadServiceTest.java
+++ 
b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadServiceTest.java
@@ -26,7 +26,7 @@ import 
org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraDataDefinition;
 import 
org.apache.james.backends.cassandra.components.CassandraMutualizedQuotaDataDefinition;
 import 
org.apache.james.backends.cassandra.components.CassandraQuotaCurrentValueDao;
-import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.PlainBlobId;
 import org.apache.james.blob.memory.MemoryBlobStoreDAO;
 import org.apache.james.jmap.api.upload.UploadRepository;
@@ -35,7 +35,6 @@ import org.apache.james.jmap.api.upload.UploadServiceContract;
 import org.apache.james.jmap.api.upload.UploadServiceDefaultImpl;
 import org.apache.james.jmap.api.upload.UploadUsageRepository;
 import 
org.apache.james.mailbox.cassandra.modules.CassandraMailboxQuotaDataDefinition;
-import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -51,8 +50,9 @@ class CassandraUploadServiceTest implements 
UploadServiceContract {
     @BeforeEach
     void setUp(CassandraCluster cassandraCluster) {
         Clock clock = Clock.systemUTC();
-        uploadRepository = new CassandraUploadRepository(new 
UploadDAO(cassandraCluster.getConf(), new PlainBlobId.Factory()), new 
DeDuplicationBlobStore(new MemoryBlobStoreDAO(),
-            BucketName.of("default"), new PlainBlobId.Factory()), clock);
+        BlobId.Factory blobIdFactory = new PlainBlobId.Factory();
+        uploadRepository = new CassandraUploadRepository(new 
UploadDAO(cassandraCluster.getConf(),
+            blobIdFactory), blobIdFactory, new MemoryBlobStoreDAO(), clock);
         uploadUsageRepository = new CassandraUploadUsageRepository(new 
CassandraQuotaCurrentValueDao(cassandraCluster.getConf()));
         testee = new UploadServiceDefaultImpl(uploadRepository, 
uploadUsageRepository, UploadServiceContract.TEST_CONFIGURATION());
     }
diff --git 
a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepository.java
 
b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepository.java
index 35d2c7b86c..737f1f8efb 100644
--- 
a/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepository.java
+++ 
b/server/data/data-jmap-postgres/src/main/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepository.java
@@ -20,7 +20,6 @@
 package org.apache.james.jmap.postgres.upload;
 
 import static 
org.apache.james.backends.postgres.PostgresCommons.INSTANT_TO_LOCAL_DATE_TIME;
-import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
 import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.io.InputStream;
@@ -31,7 +30,8 @@ import java.time.LocalDateTime;
 import jakarta.inject.Inject;
 import jakarta.inject.Singleton;
 
-import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.core.Username;
 import org.apache.james.jmap.api.model.Upload;
@@ -49,17 +49,19 @@ import reactor.core.publisher.Mono;
 
 public class PostgresUploadRepository implements UploadRepository {
     public static final BucketName UPLOAD_BUCKET = 
BucketName.of("jmap-uploads");
-    private final BlobStore blobStore;
+    private final BlobId.Factory blobIdFactory;
+    private final BlobStoreDAO blobStoreDAO;
     private final Clock clock;
     private final PostgresUploadDAO.Factory uploadDAOFactory;
     private final PostgresUploadDAO byPassRLSUploadDAO;
 
     @Inject
     @Singleton
-    public PostgresUploadRepository(BlobStore blobStore, Clock clock,
+    public PostgresUploadRepository(BlobId.Factory blobIdFactory, BlobStoreDAO 
blobStoreDAO, Clock clock,
                                     PostgresUploadDAO.Factory uploadDAOFactory,
                                     PostgresUploadDAO byPassRLSUploadDAO) {
-        this.blobStore = blobStore;
+        this.blobIdFactory = blobIdFactory;
+        this.blobStoreDAO = blobStoreDAO;
         this.clock = clock;
         this.uploadDAOFactory = uploadDAOFactory;
         this.byPassRLSUploadDAO = byPassRLSUploadDAO;
@@ -68,17 +70,20 @@ public class PostgresUploadRepository implements 
UploadRepository {
     @Override
     public Mono<UploadMetaData> upload(InputStream data, ContentType 
contentType, Username user) {
         UploadId uploadId = generateId();
+        BlobId blobId = blobIdFactory.of(uploadId.asString());
         PostgresUploadDAO uploadDAO = 
uploadDAOFactory.create(user.getDomainPart());
+
         return Mono.fromCallable(() -> new CountingInputStream(data))
-            .flatMap(countingInputStream -> 
Mono.from(blobStore.save(UPLOAD_BUCKET, countingInputStream, LOW_COST))
-                .map(blobId -> UploadMetaData.from(uploadId, contentType, 
countingInputStream.getCount(), blobId, clock.instant()))
-                .flatMap(uploadMetaData -> uploadDAO.insert(uploadMetaData, 
user)));
+            .flatMap(countingInputStream -> 
Mono.from(blobStoreDAO.save(UPLOAD_BUCKET, blobId, countingInputStream))
+                    .thenReturn(countingInputStream))
+                .map(countingInputStream -> UploadMetaData.from(uploadId, 
contentType, countingInputStream.getCount(), blobId, clock.instant()))
+                .flatMap(uploadMetaData -> uploadDAO.insert(uploadMetaData, 
user));
     }
 
     @Override
     public Mono<Upload> retrieve(UploadId id, Username user) {
         return uploadDAOFactory.create(user.getDomainPart()).get(id, user)
-            .flatMap(upload -> Mono.from(blobStore.readReactive(UPLOAD_BUCKET, 
upload.blobId(), LOW_COST))
+            .flatMap(upload -> 
Mono.from(blobStoreDAO.readReactive(UPLOAD_BUCKET, upload.blobId()))
                 .map(inputStream -> Upload.from(upload, () -> inputStream)))
             .switchIfEmpty(Mono.error(() -> new UploadNotFoundException(id)));
     }
@@ -101,7 +106,7 @@ public class PostgresUploadRepository implements 
UploadRepository {
             .flatMap(uploadPair -> {
                 Username username = uploadPair.getRight();
                 UploadMetaData upload = uploadPair.getLeft();
-                return Mono.from(blobStore.delete(UPLOAD_BUCKET, 
upload.blobId()))
+                return Mono.from(blobStoreDAO.delete(UPLOAD_BUCKET, 
upload.blobId()))
                     .then(byPassRLSUploadDAO.delete(upload.uploadId(), 
username));
             }, DEFAULT_CONCURRENCY)
             .then();
diff --git 
a/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepositoryTest.java
 
b/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepositoryTest.java
index d7b89094b7..80f2f48ec2 100644
--- 
a/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepositoryTest.java
+++ 
b/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadRepositoryTest.java
@@ -24,13 +24,11 @@ import java.time.Clock;
 import org.apache.james.backends.postgres.PostgresDataDefinition;
 import org.apache.james.backends.postgres.PostgresExtension;
 import org.apache.james.blob.api.BlobId;
-import org.apache.james.blob.api.BlobStore;
-import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.PlainBlobId;
 import org.apache.james.blob.memory.MemoryBlobStoreDAO;
 import org.apache.james.jmap.api.upload.UploadRepository;
 import org.apache.james.jmap.api.upload.UploadRepositoryContract;
-import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
 import org.apache.james.utils.UpdatableTickingClock;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -40,6 +38,7 @@ class PostgresUploadRepositoryTest implements 
UploadRepositoryContract {
     @RegisterExtension
     static PostgresExtension postgresExtension = 
PostgresExtension.withoutRowLevelSecurity(
         
PostgresDataDefinition.aggregateModules(PostgresUploadDataDefinition.MODULE));
+    private BlobStoreDAO blobStoreDAO;
     private UploadRepository testee;
     private UpdatableTickingClock clock;
 
@@ -47,10 +46,10 @@ class PostgresUploadRepositoryTest implements 
UploadRepositoryContract {
     void setUp() {
         clock = new UpdatableTickingClock(Clock.systemUTC().instant());
         BlobId.Factory blobIdFactory = new PlainBlobId.Factory();
-        BlobStore blobStore = new DeDuplicationBlobStore(new 
MemoryBlobStoreDAO(), BucketName.DEFAULT, blobIdFactory);
         PostgresUploadDAO uploadDAO = new 
PostgresUploadDAO(postgresExtension.getDefaultPostgresExecutor(), 
blobIdFactory);
         PostgresUploadDAO.Factory uploadFactory = new 
PostgresUploadDAO.Factory(blobIdFactory, 
postgresExtension.getExecutorFactory());
-        testee = new PostgresUploadRepository(blobStore, clock, uploadFactory, 
uploadDAO);
+        blobStoreDAO = new MemoryBlobStoreDAO();
+        testee = new PostgresUploadRepository(blobIdFactory, blobStoreDAO, 
clock, uploadFactory, uploadDAO);
     }
 
     @Override
@@ -62,4 +61,9 @@ class PostgresUploadRepositoryTest implements 
UploadRepositoryContract {
     public UpdatableTickingClock clock() {
         return clock;
     }
+
+    @Override
+    public BlobStoreDAO blobStoreDAO() {
+        return blobStoreDAO;
+    }
 }
diff --git 
a/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadServiceTest.java
 
b/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadServiceTest.java
index 2c220e5769..81bf880be6 100644
--- 
a/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadServiceTest.java
+++ 
b/server/data/data-jmap-postgres/src/test/java/org/apache/james/jmap/postgres/upload/PostgresUploadServiceTest.java
@@ -26,8 +26,6 @@ import org.apache.james.backends.postgres.PostgresExtension;
 import org.apache.james.backends.postgres.quota.PostgresQuotaCurrentValueDAO;
 import org.apache.james.backends.postgres.quota.PostgresQuotaDataDefinition;
 import org.apache.james.blob.api.BlobId;
-import org.apache.james.blob.api.BlobStore;
-import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.PlainBlobId;
 import org.apache.james.blob.memory.MemoryBlobStoreDAO;
 import org.apache.james.jmap.api.upload.UploadRepository;
@@ -35,7 +33,6 @@ import org.apache.james.jmap.api.upload.UploadService;
 import org.apache.james.jmap.api.upload.UploadServiceContract;
 import org.apache.james.jmap.api.upload.UploadServiceDefaultImpl;
 import org.apache.james.jmap.api.upload.UploadUsageRepository;
-import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -52,10 +49,9 @@ public class PostgresUploadServiceTest implements 
UploadServiceContract {
     @BeforeEach
     void setUp() {
         BlobId.Factory blobIdFactory = new PlainBlobId.Factory();
-        BlobStore blobStore = new DeDuplicationBlobStore(new 
MemoryBlobStoreDAO(), BucketName.DEFAULT, blobIdFactory);
         PostgresUploadDAO uploadDAO = new 
PostgresUploadDAO(postgresExtension.getDefaultPostgresExecutor(), 
blobIdFactory);
         PostgresUploadDAO.Factory uploadFactory = new 
PostgresUploadDAO.Factory(blobIdFactory, 
postgresExtension.getExecutorFactory());
-        uploadRepository = new PostgresUploadRepository(blobStore, 
Clock.systemUTC(), uploadFactory, uploadDAO);
+        uploadRepository = new PostgresUploadRepository(blobIdFactory, new 
MemoryBlobStoreDAO(), Clock.systemUTC(), uploadFactory, uploadDAO);
         uploadUsageRepository = new PostgresUploadUsageRepository(new 
PostgresQuotaCurrentValueDAO(postgresExtension.getDefaultPostgresExecutor()));
         testee = new UploadServiceDefaultImpl(uploadRepository, 
uploadUsageRepository, UploadServiceContract.TEST_CONFIGURATION());
     }
diff --git 
a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepository.java
 
b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepository.java
index 9823214f98..c76b70c482 100644
--- 
a/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepository.java
+++ 
b/server/data/data-jmap/src/main/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepository.java
@@ -31,7 +31,8 @@ import java.util.Map;
 import jakarta.inject.Inject;
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.core.Username;
 import org.apache.james.jmap.api.model.Upload;
@@ -49,18 +50,20 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class InMemoryUploadRepository implements UploadRepository {
+    private static final BucketName UPLOAD_BUCKET = 
BucketName.of("jmap-uploads");
 
     private final Map<UploadId, ImmutablePair<Username, UploadMetaData>> 
uploadStore;
-
-    private final BlobStore blobStore;
+    private final BlobId.Factory blobIdFactory;
+    private final BlobStoreDAO blobStoreDAO;
     private final BucketName bucketName;
 
     private final Clock clock;
 
     @Inject
-    public InMemoryUploadRepository(BlobStore blobStore, Clock clock) {
-        this.blobStore = blobStore;
-        this.bucketName = blobStore.getDefaultBucketName();
+    public InMemoryUploadRepository(BlobId.Factory blobIdFactory, BlobStoreDAO 
blobStoreDAO, Clock clock) {
+        this.blobIdFactory = blobIdFactory;
+        this.blobStoreDAO = blobStoreDAO;
+        this.bucketName = UPLOAD_BUCKET;
         this.clock = clock;
         this.uploadStore = new HashMap<>();
     }
@@ -71,15 +74,17 @@ public class InMemoryUploadRepository implements 
UploadRepository {
         Preconditions.checkNotNull(contentType);
         Preconditions.checkNotNull(user);
 
+        UploadId uploadId = UploadId.random();
+        BlobId blobId = blobIdFactory.of(uploadId.asString());
+
         return Mono.fromCallable(() -> new CountingInputStream(data))
-            .flatMap(dataAsByte -> Mono.from(blobStore.save(bucketName, 
dataAsByte, BlobStore.StoragePolicy.LOW_COST))
-                .map(blobId -> {
-                    UploadId uploadId = UploadId.random();
+            .flatMap(dataAsByte -> Mono.from(blobStoreDAO.save(bucketName, 
blobId, dataAsByte))
+                    .thenReturn(dataAsByte))
+                .map(dataAsByte -> {
                     Instant uploadDate = clock.instant();
                     uploadStore.put(uploadId, new ImmutablePair<>(user, 
UploadMetaData.from(uploadId, contentType, dataAsByte.getCount(), blobId, 
uploadDate)));
                     return UploadMetaData.from(uploadId, contentType, 
dataAsByte.getCount(), blobId, uploadDate);
-                })
-            );
+                });
     }
 
     @Override
@@ -116,13 +121,13 @@ public class InMemoryUploadRepository implements 
UploadRepository {
         Instant expirationTime = clock.instant().minus(expireDuration);
         return Flux.fromIterable(List.copyOf(uploadStore.values()))
             .filter(pair -> pair.right.uploadDate().isBefore(expirationTime))
-            .flatMap(pair -> Mono.from(blobStore.delete(bucketName, 
pair.right.blobId()))
+            .flatMap(pair -> Mono.from(blobStoreDAO.delete(bucketName, 
pair.right.blobId()))
                 .then(Mono.fromRunnable(() -> 
uploadStore.remove(pair.right.uploadId()))))
             .then();
     }
 
     private Mono<Upload> retrieveUpload(UploadMetaData uploadMetaData) {
-        return Mono.from(blobStore.readBytes(bucketName, 
uploadMetaData.blobId()))
+        return Mono.from(blobStoreDAO.readBytes(bucketName, 
uploadMetaData.blobId()))
             .map(content -> Upload.from(uploadMetaData, () -> new 
ByteArrayInputStream(content)));
     }
 }
diff --git 
a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala
 
b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala
index f544408968..a4759dade5 100644
--- 
a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala
+++ 
b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala
@@ -21,14 +21,15 @@
 
  import java.io.InputStream
  import java.nio.charset.StandardCharsets
- import java.time.{Clock, Duration}
+ import java.time.Duration
  import java.util.UUID
 
  import org.apache.commons.io.IOUtils
+ import org.apache.james.blob.api.{BlobId, BlobStoreDAO, BucketName, 
ObjectNotFoundException}
  import org.apache.james.core.Username
  import org.apache.james.jmap.api.model.Size.sanitizeSize
  import org.apache.james.jmap.api.model.{Upload, UploadId, UploadMetaData, 
UploadNotFoundException}
- import 
org.apache.james.jmap.api.upload.UploadRepositoryContract.{CONTENT_TYPE, 
DATA_STRING, USER}
+ import 
org.apache.james.jmap.api.upload.UploadRepositoryContract.{CONTENT_TYPE, 
DATA_STRING, UPLOAD_BUCKET, USER}
  import org.apache.james.mailbox.model.ContentType
  import org.apache.james.utils.UpdatableTickingClock
  import org.assertj.core.api.Assertions.{assertThat, assertThatCode, 
assertThatThrownBy}
@@ -43,6 +44,7 @@
      .of("text/html")
    private lazy val DATA_STRING: String = "123321"
    private lazy val USER: Username = Username.of("Bob")
+   private lazy val UPLOAD_BUCKET: BucketName = BucketName.of("jmap-uploads")
  }
 
  trait UploadRepositoryContract {
@@ -53,6 +55,8 @@
 
    def clock: UpdatableTickingClock
 
+   def blobStoreDAO: BlobStoreDAO
+
    def data(): InputStream = IOUtils.toInputStream(DATA_STRING, 
StandardCharsets.UTF_8)
 
    @Test
@@ -218,4 +222,15 @@
      assertThat(SMono.fromPublisher(testee.retrieve(uploadId2, USER)).block())
        .isNotNull
    }
+
+   @Test
+   def deleteByUploadDateBeforeShouldRemoveExpiredUploadsFromBlobstore(): Unit 
= {
+     val blobId: BlobId = SMono.fromPublisher(testee.upload(data(), 
CONTENT_TYPE, USER)).block().blobId
+     clock.setInstant(clock.instant().plus(8, 
java.time.temporal.ChronoUnit.DAYS))
+
+     SMono(testee.deleteByUploadDateBefore(Duration.ofDays(7))).block();
+
+     assertThatThrownBy(() => blobStoreDAO.read(UPLOAD_BUCKET, blobId))
+       .isInstanceOf(classOf[ObjectNotFoundException])
+   }
  }
diff --git 
a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryJMAPCurrentUploadUsageCalculatorTest.java
 
b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryJMAPCurrentUploadUsageCalculatorTest.java
index bddc9d808f..b0d7368980 100644
--- 
a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryJMAPCurrentUploadUsageCalculatorTest.java
+++ 
b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryJMAPCurrentUploadUsageCalculatorTest.java
@@ -21,15 +21,12 @@ package org.apache.james.jmap.memory.upload;
 
 import java.time.Clock;
 
-import org.apache.james.blob.api.BlobStore;
-import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.PlainBlobId;
 import org.apache.james.blob.memory.MemoryBlobStoreDAO;
 import org.apache.james.jmap.api.upload.JMAPCurrentUploadUsageCalculator;
 import 
org.apache.james.jmap.api.upload.JMAPCurrentUploadUsageCalculatorContract;
 import org.apache.james.jmap.api.upload.UploadRepository;
 import org.apache.james.jmap.api.upload.UploadUsageRepository;
-import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
 import org.junit.jupiter.api.BeforeEach;
 
 public class InMemoryJMAPCurrentUploadUsageCalculatorTest implements 
JMAPCurrentUploadUsageCalculatorContract {
@@ -40,8 +37,7 @@ public class InMemoryJMAPCurrentUploadUsageCalculatorTest 
implements JMAPCurrent
 
     @BeforeEach
     private void setup() {
-        BlobStore blobStore = new DeDuplicationBlobStore(new 
MemoryBlobStoreDAO(), BucketName.DEFAULT, new PlainBlobId.Factory());
-        uploadRepository = new InMemoryUploadRepository(blobStore, 
Clock.systemUTC());
+        uploadRepository = new InMemoryUploadRepository(new 
PlainBlobId.Factory(), new MemoryBlobStoreDAO(), Clock.systemUTC());
         uploadUsageRepository = new InMemoryUploadUsageRepository();
         jmapCurrentUploadUsageCalculator = new 
JMAPCurrentUploadUsageCalculator(uploadRepository, uploadUsageRepository);
     }
diff --git 
a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepositoryTest.java
 
b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepositoryTest.java
index d1f37e01ce..aa4b62effa 100644
--- 
a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepositoryTest.java
+++ 
b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryUploadRepositoryTest.java
@@ -21,26 +21,25 @@ package org.apache.james.jmap.memory.upload;
 
 import java.time.Clock;
 
-import org.apache.james.blob.api.BlobStore;
-import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.PlainBlobId;
 import org.apache.james.blob.memory.MemoryBlobStoreDAO;
 import org.apache.james.jmap.api.upload.UploadRepository;
 import org.apache.james.jmap.api.upload.UploadRepositoryContract;
-import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
 import org.apache.james.utils.UpdatableTickingClock;
 import org.junit.jupiter.api.BeforeEach;
 
 public class InMemoryUploadRepositoryTest implements UploadRepositoryContract {
 
+    private BlobStoreDAO blobStoreDAO;
     private UploadRepository testee;
     private UpdatableTickingClock clock;
 
     @BeforeEach
     void setUp() {
-        BlobStore blobStore = new DeDuplicationBlobStore(new 
MemoryBlobStoreDAO(), BucketName.DEFAULT, new PlainBlobId.Factory());
         clock = new UpdatableTickingClock(Clock.systemUTC().instant());
-        testee = new InMemoryUploadRepository(blobStore, clock);
+        blobStoreDAO = new MemoryBlobStoreDAO();
+        testee = new InMemoryUploadRepository(new PlainBlobId.Factory(), 
blobStoreDAO, clock);
     }
 
     @Override
@@ -52,4 +51,9 @@ public class InMemoryUploadRepositoryTest implements 
UploadRepositoryContract {
     public UpdatableTickingClock clock() {
         return clock;
     }
+
+    @Override
+    public BlobStoreDAO blobStoreDAO() {
+        return blobStoreDAO;
+    }
 }
diff --git 
a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryUploadServiceTest.java
 
b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryUploadServiceTest.java
index 628761c71d..15c7c160ac 100644
--- 
a/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryUploadServiceTest.java
+++ 
b/server/data/data-jmap/src/test/java/org/apache/james/jmap/memory/upload/InMemoryUploadServiceTest.java
@@ -21,8 +21,6 @@ package org.apache.james.jmap.memory.upload;
 
 import java.time.Clock;
 
-import org.apache.james.blob.api.BlobStore;
-import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.PlainBlobId;
 import org.apache.james.blob.memory.MemoryBlobStoreDAO;
 import org.apache.james.jmap.api.upload.UploadRepository;
@@ -30,7 +28,6 @@ import org.apache.james.jmap.api.upload.UploadService;
 import org.apache.james.jmap.api.upload.UploadServiceContract;
 import org.apache.james.jmap.api.upload.UploadServiceDefaultImpl;
 import org.apache.james.jmap.api.upload.UploadUsageRepository;
-import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
 import org.junit.jupiter.api.BeforeEach;
 
 public class InMemoryUploadServiceTest implements UploadServiceContract {
@@ -41,8 +38,7 @@ public class InMemoryUploadServiceTest implements 
UploadServiceContract {
 
     @BeforeEach
     void setUp() {
-        BlobStore blobStore = new DeDuplicationBlobStore(new 
MemoryBlobStoreDAO(), BucketName.DEFAULT, new PlainBlobId.Factory());
-        uploadRepository = new InMemoryUploadRepository(blobStore, 
Clock.systemUTC());
+        uploadRepository = new InMemoryUploadRepository(new 
PlainBlobId.Factory(), new MemoryBlobStoreDAO(), Clock.systemUTC());
         uploadUsageRepository = new InMemoryUploadUsageRepository();
         testee = new UploadServiceDefaultImpl(uploadRepository, 
uploadUsageRepository, UploadServiceContract.TEST_CONFIGURATION());
     }
diff --git 
a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/JmapUploadRoutesTest.java
 
b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/JmapUploadRoutesTest.java
index bcf58aa651..3b1b8c8ba0 100644
--- 
a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/JmapUploadRoutesTest.java
+++ 
b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/JmapUploadRoutesTest.java
@@ -36,7 +36,9 @@ import java.util.Map;
 import org.apache.commons.io.IOUtils;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraDataDefinition;
+import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BlobStoreDAO;
 import org.apache.james.blob.api.BucketName;
 import org.apache.james.blob.api.PlainBlobId;
 import org.apache.james.blob.memory.MemoryBlobStoreDAO;
@@ -97,12 +99,14 @@ class JmapUploadRoutesTest {
     void setUp() {
         taskManager = new MemoryTaskManager(new Hostname("foo"));
         clock = new UpdatableTickingClock(TIMESTAMP.toInstant());
-        blobStore = new PassThroughBlobStore(new MemoryBlobStoreDAO(),
+        BlobStoreDAO blobStoreDAO = new MemoryBlobStoreDAO();
+        BlobId.Factory blobIdFactory = new PlainBlobId.Factory();
+        blobStore = new PassThroughBlobStore(blobStoreDAO,
             BucketName.of("default"),
-            new PlainBlobId.Factory());
+            blobIdFactory);
 
         cassandraUploadRepository = new CassandraUploadRepository(new 
UploadDAO(cassandraCluster.getCassandraCluster().getConf(),
-            new PlainBlobId.Factory()), blobStore, clock);
+            blobIdFactory), blobIdFactory, blobStoreDAO, clock);
 
         JsonTransformer jsonTransformer = new JsonTransformer();
         TasksRoutes tasksRoutes = new TasksRoutes(taskManager, 
jsonTransformer, 
DTOConverter.of(UploadCleanupTaskAdditionalInformationDTO.SERIALIZATION_MODULE));


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org


Reply via email to