This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 40524a8a37ec461fa705567b2a981318d953fe23 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Fri Jul 5 11:48:38 2019 +0200 JAMES-2810 Implement Metadata DAO Holds seriliazed metadata and allow storage of metadata and will be used as a building block of the CassandraDeletedMessageMetadataVault. --- .../metadata/DeletedMessageMetadataModule.java | 20 +++ .../apache/james/vault/metadata/MetadataDAO.java | 152 ++++++++++++++++++++ .../james/vault/metadata/MetadataDAOTest.java | 156 +++++++++++++++++++++ .../DeletedMessageMetadataVaultContract.java | 15 +- .../DeletedMessageVaultMetadataFixture.java | 11 ++ 5 files changed, 343 insertions(+), 11 deletions(-) diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageMetadataModule.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageMetadataModule.java index eceb87c..58cf5b0 100644 --- a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageMetadataModule.java +++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageMetadataModule.java @@ -43,6 +43,16 @@ public interface DeletedMessageMetadataModule { String USER = "user"; } + interface DeletedMessageMetadataTable { + String TABLE = "deletedMessageMetadata"; + + String BUCKET_NAME = "bucketName"; + String OWNER = "owner"; + String MESSAGE_ID = "messageId"; + + String PAYLOAD = "payload"; + } + CassandraModule MODULE = CassandraModule .builder() @@ -64,5 +74,15 @@ public interface DeletedMessageMetadataModule { .addPartitionKey(UserPerBucketTable.BUCKET_NAME, text()) .addClusteringColumn(UserPerBucketTable.USER, text())) + .table(DeletedMessageMetadataTable.TABLE) + .comment("Holds storage information for deleted messages in the BlobStore based DeletedMessages vault") + .options(options -> options + .caching(SchemaBuilder.KeyCaching.ALL, SchemaBuilder.noRows())) + .statement(statement -> statement + .addPartitionKey(DeletedMessageMetadataTable.BUCKET_NAME, text()) + .addPartitionKey(DeletedMessageMetadataTable.OWNER, text()) + .addClusteringColumn(DeletedMessageMetadataTable.MESSAGE_ID, text()) + .addColumn(DeletedMessageMetadataTable.PAYLOAD, text())) + .build(); } diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java new file mode 100644 index 0000000..34f77f5 --- /dev/null +++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/MetadataDAO.java @@ -0,0 +1,152 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.vault.metadata; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.driver.core.querybuilder.QueryBuilder.delete; +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.DeletedMessageMetadataTable.BUCKET_NAME; +import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.DeletedMessageMetadataTable.MESSAGE_ID; +import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.DeletedMessageMetadataTable.OWNER; +import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.DeletedMessageMetadataTable.PAYLOAD; +import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.DeletedMessageMetadataTable.TABLE; + +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; +import org.apache.james.blob.api.BucketName; +import org.apache.james.core.User; +import org.apache.james.mailbox.model.MessageId; +import org.apache.james.vault.dto.DeletedMessageWithStorageInformationConverter; +import org.apache.james.vault.dto.DeletedMessageWithStorageInformationDTO; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.github.fge.lambdas.Throwing; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +class MetadataDAO { + private static final Logger LOGGER = LoggerFactory.getLogger(MetadataDAO.class); + + private final CassandraAsyncExecutor cassandraAsyncExecutor; + private final PreparedStatement addStatement; + private final PreparedStatement removeStatement; + private final PreparedStatement removeAllStatement; + private final PreparedStatement readStatement; + private final PreparedStatement readMessageIdStatement; + private final MessageId.Factory messageIdFactory; + private final ObjectMapper objectMapper; + private final DeletedMessageWithStorageInformationConverter dtoConverter; + + + MetadataDAO(Session session, MessageId.Factory messageIdFactory, DeletedMessageWithStorageInformationConverter dtoConverter) { + this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session); + this.addStatement = prepareAdd(session); + this.removeStatement = prepareRemove(session); + this.removeAllStatement = prepareRemoveAll(session); + this.readStatement = prepareRead(session, PAYLOAD); + this.readMessageIdStatement = prepareRead(session, MESSAGE_ID); + this.messageIdFactory = messageIdFactory; + this.dtoConverter = dtoConverter; + this.objectMapper = new ObjectMapper() + .registerModule(new Jdk8Module()) + .setSerializationInclusion(JsonInclude.Include.NON_ABSENT); + } + + private PreparedStatement prepareRead(Session session, String fieldName) { + return session.prepare(select(fieldName).from(TABLE) + .where(eq(BUCKET_NAME, bindMarker(BUCKET_NAME))) + .and(eq(OWNER, bindMarker(OWNER)))); + } + + private PreparedStatement prepareAdd(Session session) { + return session.prepare(insertInto(TABLE) + .value(BUCKET_NAME, bindMarker(BUCKET_NAME)) + .value(OWNER, bindMarker(OWNER)) + .value(MESSAGE_ID, bindMarker(MESSAGE_ID)) + .value(PAYLOAD, bindMarker(PAYLOAD))); + } + + private PreparedStatement prepareRemove(Session session) { + return session.prepare(delete().from(TABLE) + .where(eq(OWNER, bindMarker(OWNER))) + .and(eq(BUCKET_NAME, bindMarker(BUCKET_NAME))) + .and(eq(MESSAGE_ID, bindMarker(MESSAGE_ID)))); + } + + private PreparedStatement prepareRemoveAll(Session session) { + return session.prepare(delete().from(TABLE) + .where(eq(OWNER, bindMarker(OWNER))) + .and(eq(BUCKET_NAME, bindMarker(BUCKET_NAME)))); + } + + + Mono<Void> store(DeletedMessageWithStorageInformation metadata) { + return Mono.just(metadata) + .map(DeletedMessageWithStorageInformationDTO::toDTO) + .map(Throwing.function(objectMapper::writeValueAsString)) + .flatMap(payload -> cassandraAsyncExecutor.executeVoid(addStatement.bind() + .setString(BUCKET_NAME, metadata.getStorageInformation().getBucketName().asString()) + .setString(OWNER, metadata.getDeletedMessage().getOwner().asString()) + .setString(MESSAGE_ID, metadata.getDeletedMessage().getMessageId().serialize()) + .setString(PAYLOAD, payload))); + } + + Flux<DeletedMessageWithStorageInformation> retrieveMetadata(BucketName bucketName, User user) { + return cassandraAsyncExecutor.executeRows( + readStatement.bind() + .setString(BUCKET_NAME, bucketName.asString()) + .setString(OWNER, user.asString())) + .map(row -> row.getString(PAYLOAD)) + .flatMap(string -> Mono.fromCallable(() -> objectMapper.readValue(string, DeletedMessageWithStorageInformationDTO.class)) + .onErrorResume(e -> Mono.fromRunnable(() -> LOGGER.error("Error deserializing JSON metadata", e)))) + .flatMap(dto -> Mono.fromCallable(() -> dtoConverter.toDomainObject(dto)) + .onErrorResume(e -> Mono.fromRunnable(() -> LOGGER.error("Error deserializing DTO", e)))); + } + + Flux<MessageId> retrieveMessageIds(BucketName bucketName, User user) { + return cassandraAsyncExecutor.executeRows( + readMessageIdStatement.bind() + .setString(BUCKET_NAME, bucketName.asString()) + .setString(OWNER, user.asString())) + .map(row -> row.getString(MESSAGE_ID)) + .map(messageIdFactory::fromString); + } + + Mono<Void> deleteMessage(BucketName bucketName, User user, MessageId messageId) { + return cassandraAsyncExecutor.executeVoid(removeStatement.bind() + .setString(BUCKET_NAME, bucketName.asString()) + .setString(OWNER, user.asString()) + .setString(MESSAGE_ID, messageId.serialize())); + } + + Mono<Void> deleteInBucket(BucketName bucketName, User user) { + return cassandraAsyncExecutor.executeVoid(removeAllStatement.bind() + .setString(BUCKET_NAME, bucketName.asString()) + .setString(OWNER, user.asString())); + } +} diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/test/java/org/apache/james/vault/metadata/MetadataDAOTest.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/test/java/org/apache/james/vault/metadata/MetadataDAOTest.java new file mode 100644 index 0000000..956d4e1 --- /dev/null +++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/test/java/org/apache/james/vault/metadata/MetadataDAOTest.java @@ -0,0 +1,156 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.vault.metadata; + +import static org.apache.james.vault.DeletedMessageFixture.MESSAGE_ID; +import static org.apache.james.vault.DeletedMessageFixture.USER; +import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.MODULE; +import static org.apache.james.vault.metadata.DeletedMessageVaultMetadataFixture.BUCKET_NAME; +import static org.apache.james.vault.metadata.DeletedMessageVaultMetadataFixture.DELETED_MESSAGE; +import static org.apache.james.vault.metadata.DeletedMessageVaultMetadataFixture.DELETED_MESSAGE_2; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.stream.Stream; + +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.blob.api.HashBlobId; +import org.apache.james.mailbox.inmemory.InMemoryId; +import org.apache.james.mailbox.inmemory.InMemoryMessageId; +import org.apache.james.mailbox.model.MessageId; +import org.apache.james.vault.dto.DeletedMessageWithStorageInformationConverter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class MetadataDAOTest { + @RegisterExtension + static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULE); + + private MetadataDAO testee; + + @BeforeEach + void setUp(CassandraCluster cassandra) { + DeletedMessageWithStorageInformationConverter dtoConverter = new DeletedMessageWithStorageInformationConverter( + new HashBlobId.Factory(), new InMemoryMessageId.Factory(), new InMemoryId.Factory()); + + testee = new MetadataDAO(cassandra.getConf(), new InMemoryMessageId.Factory(), dtoConverter); + } + + @Test + void retrieveMessageIdsShouldReturnEmptyWhenNone() { + Stream<MessageId> messageIds = testee.retrieveMessageIds(BUCKET_NAME, USER).toStream(); + assertThat(messageIds).isEmpty(); + } + + @Test + void retrieveMessageIdsShouldReturnStoredMessageId() { + testee.store(DELETED_MESSAGE).block(); + + Stream<MessageId> messageIds = testee.retrieveMessageIds(BUCKET_NAME, USER).toStream(); + assertThat(messageIds).containsExactly(DELETED_MESSAGE.getDeletedMessage().getMessageId()); + } + + @Test + void retrieveMessageIdsShouldNotReturnDeletedMessages() { + testee.store(DELETED_MESSAGE).block(); + + testee.deleteInBucket(BUCKET_NAME, USER).block(); + + Stream<MessageId> messageIds = testee.retrieveMessageIds(BUCKET_NAME, USER).toStream(); + assertThat(messageIds).isEmpty(); + } + + @Test + void deleteInBucketShouldClearAllUserMessages() { + testee.store(DELETED_MESSAGE).block(); + testee.store(DELETED_MESSAGE_2).block(); + + testee.deleteInBucket(BUCKET_NAME, USER).block(); + + Stream<MessageId> messageIds = testee.retrieveMessageIds(BUCKET_NAME, USER).toStream(); + assertThat(messageIds).isEmpty(); + } + + @Test + void retrieveMessageIdsShouldReturnStoredMessageIds() { + testee.store(DELETED_MESSAGE).block(); + testee.store(DELETED_MESSAGE_2).block(); + + Stream<MessageId> messageIds = testee.retrieveMessageIds(BUCKET_NAME, USER).toStream(); + assertThat(messageIds).containsExactlyInAnyOrder( + DELETED_MESSAGE.getDeletedMessage().getMessageId(), + DELETED_MESSAGE_2.getDeletedMessage().getMessageId()); + } + + @Test + void retrieveMetadataShouldReturnEmptyWhenNone() { + Stream<DeletedMessageWithStorageInformation> messageIds = testee.retrieveMetadata(BUCKET_NAME, USER).toStream(); + assertThat(messageIds).isEmpty(); + } + + @Test + void retrieveMetadataShouldReturnStoredMetadata() { + testee.store(DELETED_MESSAGE).block(); + + Stream<DeletedMessageWithStorageInformation> messageIds = testee.retrieveMetadata(BUCKET_NAME, USER).toStream(); + assertThat(messageIds).containsExactly(DELETED_MESSAGE); + } + + @Test + void retrieveMetadataShouldNotReturnDeletedMessages() { + testee.store(DELETED_MESSAGE).block(); + + testee.deleteInBucket(BUCKET_NAME, USER).block(); + + Stream<DeletedMessageWithStorageInformation> messageIds = testee.retrieveMetadata(BUCKET_NAME, USER).toStream(); + assertThat(messageIds).isEmpty(); + } + + @Test + void retrieveMetadataShouldReturnAllStoredMetadata() { + testee.store(DELETED_MESSAGE).block(); + testee.store(DELETED_MESSAGE_2).block(); + + Stream<DeletedMessageWithStorageInformation> messageIds = testee.retrieveMetadata(BUCKET_NAME, USER).toStream(); + assertThat(messageIds).containsExactlyInAnyOrder(DELETED_MESSAGE, DELETED_MESSAGE_2); + } + + @Test + void deleteMessageShouldDeleteASingleMessage() { + testee.store(DELETED_MESSAGE).block(); + testee.store(DELETED_MESSAGE_2).block(); + + testee.deleteMessage(BUCKET_NAME, USER, MESSAGE_ID).block(); + + Stream<DeletedMessageWithStorageInformation> messageIds = testee.retrieveMetadata(BUCKET_NAME, USER).toStream(); + assertThat(messageIds).containsExactlyInAnyOrder(DELETED_MESSAGE_2); + } + + @Test + void retrieveMetadataShouldNotReturnDeletedMetadata() { + testee.store(DELETED_MESSAGE).block(); + + testee.deleteMessage(BUCKET_NAME, USER, MESSAGE_ID).block(); + + Stream<DeletedMessageWithStorageInformation> messageIds = testee.retrieveMetadata(BUCKET_NAME, USER).toStream(); + assertThat(messageIds).isEmpty(); + } +} \ No newline at end of file diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/metadata/DeletedMessageMetadataVaultContract.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/metadata/DeletedMessageMetadataVaultContract.java index 6a72445..c27d31d 100644 --- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/metadata/DeletedMessageMetadataVaultContract.java +++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/metadata/DeletedMessageMetadataVaultContract.java @@ -20,9 +20,11 @@ package org.apache.james.vault.metadata; import static org.apache.james.vault.DeletedMessageFixture.USER; -import static org.apache.james.vault.metadata.DeletedMessageVaultMetadataFixture.BLOB_ID_2; import static org.apache.james.vault.metadata.DeletedMessageVaultMetadataFixture.BUCKET_NAME; -import static org.apache.james.vault.metadata.DeletedMessageVaultMetadataFixture.STORAGE_INFORMATION; +import static org.apache.james.vault.metadata.DeletedMessageVaultMetadataFixture.DELETED_MESSAGE; +import static org.apache.james.vault.metadata.DeletedMessageVaultMetadataFixture.DELETED_MESSAGE_2; +import static org.apache.james.vault.metadata.DeletedMessageVaultMetadataFixture.DELETED_MESSAGE_2_OTHER_BUCKET; +import static org.apache.james.vault.metadata.DeletedMessageVaultMetadataFixture.OTHER_BUCKET_NAME; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -30,21 +32,12 @@ import java.util.Optional; import java.util.stream.Stream; import org.apache.james.blob.api.BucketName; -import org.apache.james.vault.DeletedMessageFixture; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; public interface DeletedMessageMetadataVaultContract { - DeletedMessageWithStorageInformation DELETED_MESSAGE = new DeletedMessageWithStorageInformation(DeletedMessageFixture.DELETED_MESSAGE, STORAGE_INFORMATION); - DeletedMessageWithStorageInformation DELETED_MESSAGE_2 = new DeletedMessageWithStorageInformation(DeletedMessageFixture.DELETED_MESSAGE_2, STORAGE_INFORMATION); - BucketName OTHER_BUCKET_NAME = BucketName.of("other"); - StorageInformation OTHER_STORAGE_INFORMATION = StorageInformation.builder() - .bucketName(OTHER_BUCKET_NAME) - .blobId(BLOB_ID_2); - DeletedMessageWithStorageInformation DELETED_MESSAGE_2_OTHER_BUCKET = new DeletedMessageWithStorageInformation(DeletedMessageFixture.DELETED_MESSAGE_2, - OTHER_STORAGE_INFORMATION); DeletedMessageMetadataVault metadataVault(); diff --git a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/metadata/DeletedMessageVaultMetadataFixture.java b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/metadata/DeletedMessageVaultMetadataFixture.java index 465ce39..26f9db8 100644 --- a/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/metadata/DeletedMessageVaultMetadataFixture.java +++ b/mailbox/plugin/deleted-messages-vault/src/test/java/org/apache/james/vault/metadata/DeletedMessageVaultMetadataFixture.java @@ -22,12 +22,23 @@ package org.apache.james.vault.metadata; import org.apache.james.blob.api.BlobId; import org.apache.james.blob.api.BucketName; import org.apache.james.blob.api.HashBlobId; +import org.apache.james.vault.DeletedMessageFixture; public interface DeletedMessageVaultMetadataFixture { BlobId BLOB_ID = new HashBlobId.Factory().from("05dcb33b-8382-4744-923a-bc593ad84d23"); BlobId BLOB_ID_2 = new HashBlobId.Factory().from("05dcb33b-8382-4744-923a-bc593ad84d24"); BucketName BUCKET_NAME = BucketName.of("bucket-2019-06-01"); + BucketName OTHER_BUCKET_NAME = BucketName.of("other"); + StorageInformation STORAGE_INFORMATION = StorageInformation.builder() .bucketName(BUCKET_NAME) .blobId(BLOB_ID); + StorageInformation OTHER_STORAGE_INFORMATION = StorageInformation.builder() + .bucketName(OTHER_BUCKET_NAME) + .blobId(BLOB_ID_2); + + DeletedMessageWithStorageInformation DELETED_MESSAGE_2_OTHER_BUCKET = new DeletedMessageWithStorageInformation(DeletedMessageFixture.DELETED_MESSAGE_2, + OTHER_STORAGE_INFORMATION); + DeletedMessageWithStorageInformation DELETED_MESSAGE = new DeletedMessageWithStorageInformation(DeletedMessageFixture.DELETED_MESSAGE, STORAGE_INFORMATION); + DeletedMessageWithStorageInformation DELETED_MESSAGE_2 = new DeletedMessageWithStorageInformation(DeletedMessageFixture.DELETED_MESSAGE_2, STORAGE_INFORMATION); } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org