This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a7724744df644fbd1f763be0673d75d190324e68
Author: Benoit Tellier <[email protected]>
AuthorDate: Wed Aug 4 17:27:23 2021 +0700

    JAMES-3544 Implement a distributed UploadRepository
---
 server/data/data-jmap-cassandra/pom.xml            |  10 ++
 .../upload/CassandraUploadRepository.java          |  68 +++++++++
 .../jmap/cassandra/upload/UploadConfiguration.java |  61 ++++++++
 .../james/jmap/cassandra/upload/UploadDAO.java     | 166 +++++++++++++++++++++
 .../james/jmap/cassandra/upload/UploadModule.java  |  59 ++++++++
 .../upload/CassandraUploadRepositoryTest.java      |  80 ++++++++++
 .../jmap/api/upload/UploadRepositoryContract.scala |   4 +-
 7 files changed, 447 insertions(+), 1 deletion(-)

diff --git a/server/data/data-jmap-cassandra/pom.xml 
b/server/data/data-jmap-cassandra/pom.xml
index 567591d..ec5a898 100644
--- a/server/data/data-jmap-cassandra/pom.xml
+++ b/server/data/data-jmap-cassandra/pom.xml
@@ -56,6 +56,16 @@
         </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
+            <artifactId>blob-memory</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>blob-storage-strategy</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
             <artifactId>event-sourcing-event-store-cassandra</artifactId>
         </dependency>
         <dependency>
diff --git 
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java
 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java
new file mode 100644
index 0000000..d32eace
--- /dev/null
+++ 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java
@@ -0,0 +1,68 @@
+package org.apache.james.jmap.cassandra.upload;
+
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+
+import java.io.InputStream;
+
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.core.Username;
+import org.apache.james.jmap.api.model.Upload;
+import org.apache.james.jmap.api.model.UploadId;
+import org.apache.james.jmap.api.model.UploadMetaData;
+import org.apache.james.jmap.api.model.UploadNotFoundException;
+import org.apache.james.jmap.api.upload.UploadRepository;
+import org.apache.james.mailbox.model.ContentType;
+import org.reactivestreams.Publisher;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.common.io.CountingInputStream;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class CassandraUploadRepository implements UploadRepository {
+    private final UploadDAO uploadDAO;
+    private final BlobStore blobStore;
+    private final BucketNameGenerator bucketNameGenerator;
+
+    public CassandraUploadRepository(UploadDAO uploadDAO, BlobStore blobStore, 
BucketNameGenerator bucketNameGenerator) {
+        this.uploadDAO = uploadDAO;
+        this.blobStore = blobStore;
+        this.bucketNameGenerator = bucketNameGenerator;
+    }
+
+    @Override
+    public Publisher<UploadId> upload(InputStream data, ContentType 
contentType, Username user) {
+        UploadId uploadId = generateId();
+        UploadBucketName uploadBucketName = bucketNameGenerator.current();
+        BucketName bucketName = uploadBucketName.asBucketName();
+
+        return Mono.fromCallable(() -> new CountingInputStream(data))
+            .flatMap(countingInputStream -> 
Mono.from(blobStore.save(bucketName, countingInputStream, LOW_COST))
+                .map(blobId -> new UploadDAO.UploadRepresentation(uploadId, 
bucketName, blobId, contentType, countingInputStream.getCount(), user))
+                .flatMap(upload -> 
uploadDAO.save(upload).thenReturn(upload.getId())));
+    }
+
+    @Override
+    public Publisher<Upload> retrieve(UploadId id, Username user) {
+        return uploadDAO.retrieve(id)
+            .filter(upload -> upload.getUser().equals(user))
+            .map(upload -> Upload.from(
+                UploadMetaData.from(id, upload.getContentType(), 
upload.getSize(), upload.getBlobId()),
+                () -> blobStore.read(upload.getBucketName(), 
upload.getBlobId(), LOW_COST)))
+            .switchIfEmpty(Mono.error(() -> new UploadNotFoundException(id)));
+    }
+
+    public Mono<Void> purge() {
+        return Flux.from(blobStore.listBuckets())
+            .<UploadBucketName>handle((bucketName, sink) -> 
UploadBucketName.ofBucket(bucketName).ifPresentOrElse(sink::next, 
sink::complete))
+            .filter(bucketNameGenerator.evictionPredicate())
+            .concatMap(bucket -> blobStore.deleteBucket(bucket.asBucketName()))
+            .then();
+    }
+
+    private UploadId generateId() {
+        return UploadId.from(UUIDs.timeBased());
+    }
+}
diff --git 
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadConfiguration.java
 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadConfiguration.java
new file mode 100644
index 0000000..675715d
--- /dev/null
+++ 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadConfiguration.java
@@ -0,0 +1,61 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.upload;
+
+import java.time.Duration;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+
+public class UploadConfiguration {
+    public static UploadConfiguration SINGLETON = new 
UploadConfiguration(Duration.ofDays(7));
+
+    private final Duration uploadTtlDuration;
+
+    public UploadConfiguration(Duration uploadTtlDuration) {
+        this.uploadTtlDuration = uploadTtlDuration;
+    }
+
+    public Duration getUploadTtlDuration() {
+        return uploadTtlDuration;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof UploadConfiguration) {
+            UploadConfiguration other = (UploadConfiguration) obj;
+            return Objects.equal(uploadTtlDuration, other.uploadTtlDuration);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(uploadTtlDuration);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects
+            .toStringHelper(this)
+            .add("uploadTtlDuration", uploadTtlDuration)
+            .toString();
+    }
+}
diff --git 
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadDAO.java
 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadDAO.java
new file mode 100644
index 0000000..07dd8b4
--- /dev/null
+++ 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadDAO.java
@@ -0,0 +1,166 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.upload;
+
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.ttl;
+import static org.apache.james.jmap.cassandra.upload.UploadModule.BLOB_ID;
+import static org.apache.james.jmap.cassandra.upload.UploadModule.BUCKET_ID;
+import static org.apache.james.jmap.cassandra.upload.UploadModule.CONTENT_TYPE;
+import static org.apache.james.jmap.cassandra.upload.UploadModule.ID;
+import static org.apache.james.jmap.cassandra.upload.UploadModule.SIZE;
+import static org.apache.james.jmap.cassandra.upload.UploadModule.TABLE_NAME;
+import static org.apache.james.jmap.cassandra.upload.UploadModule.USER;
+
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.core.Username;
+import org.apache.james.jmap.api.model.UploadId;
+import org.apache.james.mailbox.model.ContentType;
+
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+import reactor.core.publisher.Mono;
+
+public class UploadDAO {
+    public static class UploadRepresentation {
+        private final UploadId id;
+        private final BucketName bucketName;
+        private final BlobId blobId;
+        private final ContentType contentType;
+        private final long size;
+        private final Username user;
+
+        public UploadRepresentation(UploadId id, BucketName bucketName, BlobId 
blobId, ContentType contentType, long size, Username user) {
+            this.user = user;
+            Preconditions.checkArgument(size >= 0, "Size must be strictly 
positive");
+            this.id = id;
+            this.bucketName = bucketName;
+            this.blobId = blobId;
+            this.contentType = contentType;
+            this.size = size;
+        }
+
+        public UploadId getId() {
+            return id;
+        }
+
+        public BucketName getBucketName() {
+            return bucketName;
+        }
+
+        public BlobId getBlobId() {
+            return blobId;
+        }
+
+        public ContentType getContentType() {
+            return contentType;
+        }
+
+        public long getSize() {
+            return size;
+        }
+
+        public Username getUser() {
+            return user;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj instanceof UploadRepresentation) {
+                UploadRepresentation other = (UploadRepresentation) obj;
+                return Objects.equal(id, other.id)
+                    && Objects.equal(bucketName, other.bucketName)
+                    && Objects.equal(user, other.user)
+                    && Objects.equal(blobId, other.blobId)
+                    && Objects.equal(contentType, other.contentType)
+                    && Objects.equal(size, other.size);
+            }
+            return false;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hashCode(id, bucketName, blobId, contentType, size, 
user);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects
+                .toStringHelper(this)
+                .add("id", id)
+                .add("bucketName", bucketName)
+                .add("blobId", blobId)
+                .add("contentType", contentType)
+                .add("user", user)
+                .add("size", size)
+                .toString();
+        }
+    }
+
+    private final CassandraAsyncExecutor executor;
+    private final BlobId.Factory blobIdFactory;
+    private final PreparedStatement insert;
+    private final PreparedStatement selectOne;
+
+    public UploadDAO(Session session, BlobId.Factory blobIdFactory, 
UploadConfiguration configuration) {
+        this.executor = new CassandraAsyncExecutor(session);
+        this.blobIdFactory = blobIdFactory;
+        this.insert = session.prepare(insertInto(TABLE_NAME)
+            .value(ID, bindMarker(ID))
+            .value(BUCKET_ID, bindMarker(BUCKET_ID))
+            .value(BLOB_ID, bindMarker(BLOB_ID))
+            .value(SIZE, bindMarker(SIZE))
+            .value(USER, bindMarker(USER))
+            .value(CONTENT_TYPE, bindMarker(CONTENT_TYPE))
+            .using(ttl((int) 
configuration.getUploadTtlDuration().getSeconds())));
+        this.selectOne = session.prepare(select().from(TABLE_NAME)
+            .where(eq(ID, bindMarker(ID))));
+    }
+
+    public Mono<Void> save(UploadRepresentation uploadRepresentation) {
+        return executor.executeVoid(insert.bind()
+            .setUUID(ID, uploadRepresentation.getId().getId())
+            .setString(BUCKET_ID, 
uploadRepresentation.getBucketName().asString())
+            .setString(BLOB_ID, uploadRepresentation.getBlobId().asString())
+            .setLong(SIZE, uploadRepresentation.getSize())
+            .setString(USER, uploadRepresentation.getUser().asString())
+            .setString(CONTENT_TYPE, 
uploadRepresentation.getContentType().asString()));
+    }
+
+    public Mono<UploadRepresentation> retrieve(UploadId id) {
+        return executor.executeSingleRow(selectOne.bind()
+            .setUUID(ID, id.getId()))
+            .map(row -> new UploadRepresentation(id,
+                BucketName.of(row.getString(BUCKET_ID)),
+                blobIdFactory.from(row.getString(BLOB_ID)),
+                ContentType.of(row.getString(CONTENT_TYPE)),
+                row.getLong(SIZE),
+                Username.of(row.getString(USER))));
+    }
+}
diff --git 
a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadModule.java
 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadModule.java
new file mode 100644
index 0000000..52a677b
--- /dev/null
+++ 
b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/UploadModule.java
@@ -0,0 +1,59 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.upload;
+
+import static com.datastax.driver.core.DataType.bigint;
+import static com.datastax.driver.core.DataType.text;
+import static com.datastax.driver.core.DataType.timeuuid;
+import static 
com.datastax.driver.core.schemabuilder.TableOptions.CompactionOptions.TimeWindowCompactionStrategyOptions.CompactionWindowUnit.DAYS;
+import static 
org.apache.james.backends.cassandra.utils.CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION;
+
+import org.apache.james.backends.cassandra.components.CassandraModule;
+
+import com.datastax.driver.core.schemabuilder.SchemaBuilder;
+
+public interface UploadModule {
+
+    String TABLE_NAME = "uploads";
+
+    String ID = "id";
+    String CONTENT_TYPE = "content_type";
+    String SIZE = "size";
+    String BUCKET_ID = "bucket_id";
+    String BLOB_ID = "blob_id";
+    String USER = "user";
+
+    CassandraModule MODULE = CassandraModule.table(TABLE_NAME)
+        .comment("Holds JMAP uploads")
+        .options(options -> options
+            .compactionOptions(SchemaBuilder.timeWindowCompactionStrategy()
+                .compactionWindowSize(7)
+                .compactionWindowUnit(DAYS))
+            .caching(SchemaBuilder.KeyCaching.ALL, 
SchemaBuilder.rows(DEFAULT_CACHED_ROW_PER_PARTITION)))
+        .statement(statement -> statement
+            .addPartitionKey(ID, timeuuid())
+            .addColumn(CONTENT_TYPE, text())
+            .addColumn(SIZE, bigint())
+            .addColumn(BUCKET_ID, text())
+            .addColumn(BLOB_ID, text())
+            .addColumn(USER, text()))
+
+        .build();
+}
diff --git 
a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java
 
b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java
new file mode 100644
index 0000000..c4b71cd
--- /dev/null
+++ 
b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepositoryTest.java
@@ -0,0 +1,80 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.cassandra.upload;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.time.Clock;
+import java.time.Duration;
+
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.blob.api.HashBlobId;
+import org.apache.james.blob.memory.MemoryBlobStoreDAO;
+import org.apache.james.core.Username;
+import org.apache.james.jmap.api.model.UploadId;
+import org.apache.james.jmap.api.model.UploadNotFoundException;
+import org.apache.james.jmap.api.upload.UploadRepository;
+import org.apache.james.jmap.api.upload.UploadRepositoryContract;
+import org.apache.james.mailbox.model.ContentType;
+import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.datastax.driver.core.utils.UUIDs;
+
+import reactor.core.publisher.Mono;
+
+class CassandraUploadRepositoryTest implements UploadRepositoryContract {
+    @RegisterExtension
+    static CassandraClusterExtension cassandra = new 
CassandraClusterExtension(UploadModule.MODULE);
+    private CassandraUploadRepository testee;
+
+    @BeforeEach
+    void setUp() {
+        testee = new CassandraUploadRepository(new 
UploadDAO(cassandra.getCassandraCluster().getConf(),
+            new HashBlobId.Factory(),
+            new UploadConfiguration(Duration.ofSeconds(5))),
+            new DeDuplicationBlobStore(new MemoryBlobStoreDAO(), 
BucketName.of("default"), new HashBlobId.Factory()),
+            new BucketNameGenerator(Clock.systemUTC()));
+    }
+
+    @Override
+    public UploadId randomUploadId() {
+        return UploadId.from(UUIDs.timeBased());
+    }
+
+    @Override
+    public UploadRepository testee() {
+        return testee;
+    }
+
+    @Test
+    void uploadShouldExpire() throws Exception {
+        Username bob = Username.of("bob");
+        UploadId id = Mono.from(testee.upload(data(), 
ContentType.of("text/plain"), bob)).block();
+
+        Thread.sleep(6000);
+
+        assertThatThrownBy(() -> Mono.from(testee.retrieve(id, 
bob)).blockOptional())
+            .isInstanceOf(UploadNotFoundException.class);
+    }
+}
\ No newline at end of file
diff --git 
a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala
 
b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala
index 6757f90..0eaa000 100644
--- 
a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala
+++ 
b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/upload/UploadRepositoryContract.scala
@@ -42,6 +42,8 @@
 
  trait UploadRepositoryContract {
 
+   def randomUploadId(): UploadId = UploadId.from(UUID.randomUUID())
+
    def testee: UploadRepository
 
    def data(): InputStream = IOUtils.toInputStream(DATA_STRING, 
StandardCharsets.UTF_8)
@@ -87,7 +89,7 @@
 
    @Test
    def retrieveShouldThrowWhenUploadIdIsNotExist(): Unit = {
-     assertThatThrownBy(() => 
SMono.fromPublisher(testee.retrieve(UploadId.from(UUID.randomUUID()), 
USER)).block())
+     assertThatThrownBy(() => 
SMono.fromPublisher(testee.retrieve(randomUploadId(), USER)).block())
        .isInstanceOf(classOf[UploadNotFoundException])
    }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to