This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 36a1d98079b0988cc045764691b99596274f0385 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Fri Jul 5 11:08:20 2019 +0200 JAMES-2810 Implement UserPerBucket DAO This will be needed to iterate in a bucket and will be used as a building block of the CassandraDeletedMessageMetadataVault cleanup. Note that a bucket is further partitionned by user to avoid wide rows thus iterating through users is required upon bucket deletion. --- .../metadata/DeletedMessageMetadataModule.java | 15 ++++ .../james/vault/metadata/UserPerBucketDAO.java | 87 ++++++++++++++++++ .../james/vault/metadata/UserPerBucketDAOTest.java | 100 +++++++++++++++++++++ 3 files changed, 202 insertions(+) diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageMetadataModule.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageMetadataModule.java index 6fff8a9..eceb87c 100644 --- a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageMetadataModule.java +++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageMetadataModule.java @@ -36,6 +36,13 @@ public interface DeletedMessageMetadataModule { String BLOB_ID = "blobId"; } + interface UserPerBucketTable { + String TABLE = "userPerBucket"; + + String BUCKET_NAME = "bucketName"; + String USER = "user"; + } + CassandraModule MODULE = CassandraModule .builder() @@ -49,5 +56,13 @@ public interface DeletedMessageMetadataModule { .addColumn(StorageInformationTable.BUCKET_NAME, text()) .addColumn(StorageInformationTable.BLOB_ID, text())) + .table(UserPerBucketTable.TABLE) + .comment("Holds user list having deletedMessages stored in a given bucket in the BlobStore based DeletedMessages vault") + .options(options -> options + .caching(SchemaBuilder.KeyCaching.ALL, SchemaBuilder.noRows())) + .statement(statement -> statement + .addPartitionKey(UserPerBucketTable.BUCKET_NAME, text()) + .addClusteringColumn(UserPerBucketTable.USER, text())) + .build(); } diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/UserPerBucketDAO.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/UserPerBucketDAO.java new file mode 100644 index 0000000..743b91d --- /dev/null +++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/UserPerBucketDAO.java @@ -0,0 +1,87 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.vault.metadata; + +import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker; +import static com.datastax.driver.core.querybuilder.QueryBuilder.delete; +import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; +import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto; +import static com.datastax.driver.core.querybuilder.QueryBuilder.select; +import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.UserPerBucketTable.BUCKET_NAME; +import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.UserPerBucketTable.TABLE; +import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.UserPerBucketTable.USER; + +import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; +import org.apache.james.blob.api.BucketName; +import org.apache.james.core.User; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +class UserPerBucketDAO { + private final CassandraAsyncExecutor cassandraAsyncExecutor; + private final PreparedStatement addStatement; + private final PreparedStatement removeStatement; + private final PreparedStatement listStatement; + + UserPerBucketDAO(Session session) { + cassandraAsyncExecutor = new CassandraAsyncExecutor(session); + addStatement = prepareAddUser(session); + removeStatement = prepareRemoveBucket(session); + listStatement = prepareListUser(session); + } + + private PreparedStatement prepareAddUser(Session session) { + return session.prepare(insertInto(TABLE) + .value(BUCKET_NAME, bindMarker(BUCKET_NAME)) + .value(USER, bindMarker(USER))); + } + + private PreparedStatement prepareRemoveBucket(Session session) { + return session.prepare(delete().from(TABLE) + .where(eq(BUCKET_NAME, bindMarker(BUCKET_NAME)))); + } + + private PreparedStatement prepareListUser(Session session) { + return session.prepare(select(USER).from(TABLE) + .where(eq(BUCKET_NAME, bindMarker(BUCKET_NAME)))); + } + + Flux<User> retrieveUsers(BucketName bucketName) { + return cassandraAsyncExecutor.executeRows(listStatement.bind() + .setString(BUCKET_NAME, bucketName.asString())) + .map(row -> row.getString(USER)) + .map(User::fromUsername); + } + + Mono<Void> addUser(BucketName bucketName, User user) { + return cassandraAsyncExecutor.executeVoid(addStatement.bind() + .setString(BUCKET_NAME, bucketName.asString()) + .setString(USER, user.asString())); + } + + Mono<Void> deleteBucket(BucketName bucketName) { + return cassandraAsyncExecutor.executeVoid(removeStatement.bind() + .setString(BUCKET_NAME, bucketName.asString())); + } +} diff --git a/mailbox/plugin/deleted-messages-vault-cassandra/src/test/java/org/apache/james/vault/metadata/UserPerBucketDAOTest.java b/mailbox/plugin/deleted-messages-vault-cassandra/src/test/java/org/apache/james/vault/metadata/UserPerBucketDAOTest.java new file mode 100644 index 0000000..94f655e --- /dev/null +++ b/mailbox/plugin/deleted-messages-vault-cassandra/src/test/java/org/apache/james/vault/metadata/UserPerBucketDAOTest.java @@ -0,0 +1,100 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.vault.metadata; + +import static org.apache.james.vault.metadata.DeletedMessageMetadataModule.MODULE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.blob.api.BucketName; +import org.apache.james.core.User; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class UserPerBucketDAOTest { + private static final BucketName BUCKET_NAME = BucketName.of("deletedMessages-2019-06-01"); + private static final BucketName BUCKET_NAME_2 = BucketName.of("deletedMessages-2019-07-01"); + private static final User OWNER = User.fromUsername("owner"); + private static final User OWNER_2 = User.fromUsername("owner2"); + + @RegisterExtension + static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(MODULE); + + private UserPerBucketDAO testee; + + @BeforeEach + void setUp(CassandraCluster cassandra) { + testee = new UserPerBucketDAO(cassandra.getConf()); + } + + @Test + void retrieveUsersShouldReturnEmptyWhenNone() { + assertThat(testee.retrieveUsers(BUCKET_NAME).toStream()).isEmpty(); + } + + @Test + void retrieveUsersShouldReturnAddedUser() { + testee.addUser(BUCKET_NAME, OWNER).block(); + + assertThat(testee.retrieveUsers(BUCKET_NAME).toStream()).containsExactly(OWNER); + } + + @Test + void retrieveUsersShouldReturnAddedUsers() { + testee.addUser(BUCKET_NAME, OWNER).block(); + testee.addUser(BUCKET_NAME, OWNER_2).block(); + + assertThat(testee.retrieveUsers(BUCKET_NAME).toStream()).containsExactlyInAnyOrder(OWNER, OWNER_2); + } + + @Test + void retrieveUsersShouldNotReturnUsersOfOtherBuckets() { + testee.addUser(BUCKET_NAME, OWNER).block(); + testee.addUser(BUCKET_NAME_2, OWNER_2).block(); + + assertThat(testee.retrieveUsers(BUCKET_NAME).toStream()).containsExactlyInAnyOrder(OWNER); + } + + @Test + void addUserShouldBeIdempotent() { + testee.addUser(BUCKET_NAME, OWNER).block(); + testee.addUser(BUCKET_NAME, OWNER).block(); + + assertThat(testee.retrieveUsers(BUCKET_NAME).toStream()).containsExactlyInAnyOrder(OWNER); + } + + @Test + void retrieveUsersShouldReturnEmptyWhenDeletedBucket() { + testee.addUser(BUCKET_NAME, OWNER).block(); + + testee.deleteBucket(BUCKET_NAME).block(); + + assertThat(testee.retrieveUsers(BUCKET_NAME).toStream()).isEmpty(); + } + + @Test + void deleteBucketShouldNotThrowWhenNone() { + assertThatCode(() -> testee.deleteBucket(BUCKET_NAME).block()) + .doesNotThrowAnyException(); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org