This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 333ba358d56b8fb0d7e9cd3971c237eb7ec79400 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Fri Jul 5 13:59:35 2019 +0200 JAMES-2810 Implement CassandraDeletedMessageMetadataVault Plugs previously written DAOs and combines them. --- .../deleted-messages-vault-cassandra/pom.xml | 11 +++ .../CassandraDeletedMessageMetadataVault.java | 80 ++++++++++++++++++++++ .../james/vault/metadata/UserPerBucketDAO.java | 12 ++++ .../CassandraDeletedMessageMetadataVaultTest.java | 55 +++++++++++++++ .../james/vault/metadata/UserPerBucketDAOTest.java | 28 ++++++++ 5 files changed, 186 insertions(+) diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/pom.xml b/mailbox/plugin/deleted-messages-vault-cassandra/pom.xml index c430b55..f18d226 100644 --- a/mailbox/plugin/deleted-messages-vault-cassandra/pom.xml +++ b/mailbox/plugin/deleted-messages-vault-cassandra/pom.xml @@ -50,6 +50,17 @@ <artifactId>apache-james-mailbox-deleted-messages-vault</artifactId> </dependency> <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>apache-james-mailbox-deleted-messages-vault</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>apache-james-mailbox-memory</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>nl.jqno.equalsverifier</groupId> <artifactId>equalsverifier</artifactId> <scope>test</scope> diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/CassandraDeletedMessageMetadataVault.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/CassandraDeletedMessageMetadataVault.java new file mode 100644 index 0000000..f5faf02 --- /dev/null +++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/CassandraDeletedMessageMetadataVault.java @@ -0,0 +1,80 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.vault.metadata; + +import org.apache.james.blob.api.BucketName; +import org.apache.james.core.User; +import org.apache.james.mailbox.model.MessageId; +import org.reactivestreams.Publisher; + +public class CassandraDeletedMessageMetadataVault implements DeletedMessageMetadataVault { + private final MetadataDAO metadataDAO; + private final StorageInformationDAO storageInformationDAO; + private final UserPerBucketDAO userPerBucketDAO; + + CassandraDeletedMessageMetadataVault(MetadataDAO metadataDAO, StorageInformationDAO storageInformationDAO, UserPerBucketDAO userPerBucketDAO) { + this.metadataDAO = metadataDAO; + this.storageInformationDAO = storageInformationDAO; + this.userPerBucketDAO = userPerBucketDAO; + } + + @Override + public Publisher<Void> store(DeletedMessageWithStorageInformation deletedMessage) { + BucketName bucketName = deletedMessage.getStorageInformation().getBucketName(); + User owner = deletedMessage.getDeletedMessage().getOwner(); + MessageId messageId = deletedMessage.getDeletedMessage().getMessageId(); + return storageInformationDAO.referenceStorageInformation(owner, messageId, deletedMessage.getStorageInformation()) + .then(metadataDAO.store(deletedMessage)) + .then(userPerBucketDAO.addUser(bucketName, owner)); + } + + @Override + public Publisher<Void> removeMetadataRelatedToBucket(BucketName bucketName) { + return userPerBucketDAO.retrieveUsers(bucketName) + .concatMap(user -> metadataDAO.retrieveMessageIds(bucketName, user) + .map(messageId -> new DeletedMessageIdentifier(user, messageId)) + .concatMap(deletedMessageIdentifier -> storageInformationDAO.deleteStorageInformation( + deletedMessageIdentifier.getOwner(), + deletedMessageIdentifier.getMessageId())) + .then(metadataDAO.deleteInBucket(bucketName, user))) + .then(userPerBucketDAO.deleteBucket(bucketName)); + } + + @Override + public Publisher<Void> remove(BucketName bucketName, User user, MessageId messageId) { + return storageInformationDAO.deleteStorageInformation(user, messageId) + .then(metadataDAO.deleteMessage(bucketName, user, messageId)); + } + + @Override + public Publisher<StorageInformation> retrieveStorageInformation(User user, MessageId messageId) { + return storageInformationDAO.retrieveStorageInformation(user, messageId); + } + + @Override + public Publisher<DeletedMessageWithStorageInformation> listMessages(BucketName bucketName, User user) { + return metadataDAO.retrieveMetadata(bucketName, user); + } + + @Override + public Publisher<BucketName> listRelatedBuckets() { + return userPerBucketDAO.retrieveBuckets(); + } +} diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/UserPerBucketDAO.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/UserPerBucketDAO.java index 743b91d..0525685 100644 --- a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/UserPerBucketDAO.java +++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/UserPerBucketDAO.java @@ -43,12 +43,14 @@ class UserPerBucketDAO { private final PreparedStatement addStatement; private final PreparedStatement removeStatement; private final PreparedStatement listStatement; + private final PreparedStatement listBucketsStatement; UserPerBucketDAO(Session session) { cassandraAsyncExecutor = new CassandraAsyncExecutor(session); addStatement = prepareAddUser(session); removeStatement = prepareRemoveBucket(session); listStatement = prepareListUser(session); + listBucketsStatement = prepareListBuckets(session); } private PreparedStatement prepareAddUser(Session session) { @@ -67,6 +69,10 @@ class UserPerBucketDAO { .where(eq(BUCKET_NAME, bindMarker(BUCKET_NAME)))); } + private PreparedStatement prepareListBuckets(Session session) { + return session.prepare(select(BUCKET_NAME).from(TABLE).perPartitionLimit(1)); + } + Flux<User> retrieveUsers(BucketName bucketName) { return cassandraAsyncExecutor.executeRows(listStatement.bind() .setString(BUCKET_NAME, bucketName.asString())) @@ -74,6 +80,12 @@ class UserPerBucketDAO { .map(User::fromUsername); } + Flux<BucketName> retrieveBuckets() { + return cassandraAsyncExecutor.executeRows(listBucketsStatement.bind()) + .map(row -> row.getString(BUCKET_NAME)) + .map(BucketName::of); + } + Mono<Void> addUser(BucketName bucketName, User user) { return cassandraAsyncExecutor.executeVoid(addStatement.bind() .setString(BUCKET_NAME, bucketName.asString()) diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/test/java/org/apache/james/vault/metadata/CassandraDeletedMessageMetadataVaultTest.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/test/java/org/apache/james/vault/metadata/CassandraDeletedMessageMetadataVaultTest.java new file mode 100644 index 0000000..b384c35 --- /dev/null +++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/test/java/org/apache/james/vault/metadata/CassandraDeletedMessageMetadataVaultTest.java @@ -0,0 +1,55 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.vault.metadata; + +import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.MODULE; + +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.blob.api.HashBlobId; +import org.apache.james.mailbox.inmemory.InMemoryId; +import org.apache.james.mailbox.inmemory.InMemoryMessageId; +import org.apache.james.vault.dto.DeletedMessageWithStorageInformationConverter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class CassandraDeletedMessageMetadataVaultTest implements DeletedMessageMetadataVaultContract { + @RegisterExtension + static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULE); + + private DeletedMessageMetadataVault testee; + + @BeforeEach + void setUp(CassandraCluster cassandra) { + HashBlobId.Factory blobIdFactory = new HashBlobId.Factory(); + InMemoryMessageId.Factory messageIdFactory = new InMemoryMessageId.Factory(); + DeletedMessageWithStorageInformationConverter dtoConverter = new DeletedMessageWithStorageInformationConverter(blobIdFactory, messageIdFactory, new InMemoryId.Factory()); + + testee = new CassandraDeletedMessageMetadataVault( + new MetadataDAO(cassandra.getConf(), messageIdFactory, dtoConverter), + new StorageInformationDAO(cassandra.getConf(), blobIdFactory), + new UserPerBucketDAO(cassandra.getConf())); + } + + @Override + public DeletedMessageMetadataVault metadataVault() { + return testee; + } +} \ No newline at end of file diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/test/java/org/apache/james/vault/metadata/UserPerBucketDAOTest.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/test/java/org/apache/james/vault/metadata/UserPerBucketDAOTest.java index 94f655e..aedeeb9 100644 --- a/mailbox/plugin/deleted-messages-vault-cassandra/src/test/java/org/apache/james/vault/metadata/UserPerBucketDAOTest.java +++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/test/java/org/apache/james/vault/metadata/UserPerBucketDAOTest.java @@ -53,6 +53,11 @@ class UserPerBucketDAOTest { } @Test + void retrieveBucketsShouldReturnEmptyWhenNone() { + assertThat(testee.retrieveBuckets().toStream()).isEmpty(); + } + + @Test void retrieveUsersShouldReturnAddedUser() { testee.addUser(BUCKET_NAME, OWNER).block(); @@ -60,6 +65,13 @@ class UserPerBucketDAOTest { } @Test + void retrieveBucketsShouldReturnAddedBuckets() { + testee.addUser(BUCKET_NAME, OWNER).block(); + + assertThat(testee.retrieveBuckets().toStream()).containsExactly(BUCKET_NAME); + } + + @Test void retrieveUsersShouldReturnAddedUsers() { testee.addUser(BUCKET_NAME, OWNER).block(); testee.addUser(BUCKET_NAME, OWNER_2).block(); @@ -68,6 +80,14 @@ class UserPerBucketDAOTest { } @Test + void retrieveBucketsShouldNotReturnDuplicates() { + testee.addUser(BUCKET_NAME, OWNER).block(); + testee.addUser(BUCKET_NAME, OWNER_2).block(); + + assertThat(testee.retrieveBuckets().toStream()).containsExactly(BUCKET_NAME); + } + + @Test void retrieveUsersShouldNotReturnUsersOfOtherBuckets() { testee.addUser(BUCKET_NAME, OWNER).block(); testee.addUser(BUCKET_NAME_2, OWNER_2).block(); @@ -76,6 +96,14 @@ class UserPerBucketDAOTest { } @Test + void retrieveBucketsShouldReturnAllAddedBuckets() { + testee.addUser(BUCKET_NAME, OWNER).block(); + testee.addUser(BUCKET_NAME_2, OWNER_2).block(); + + assertThat(testee.retrieveUsers(BUCKET_NAME).toStream()).containsExactlyInAnyOrder(OWNER); + } + + @Test void addUserShouldBeIdempotent() { testee.addUser(BUCKET_NAME, OWNER).block(); testee.addUser(BUCKET_NAME, OWNER).block(); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org