JAMES-2647 Cassandra migration task for mapping sources
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d51c7085 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d51c7085 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d51c7085 Branch: refs/heads/master Commit: d51c7085443c947b3f77e64163d55800dc6e782b Parents: 8938f5f Author: Rene Cordier <rcord...@linagora.com> Authored: Mon Jan 21 15:02:55 2019 +0700 Committer: Benoit Tellier <btell...@linagora.com> Committed: Tue Jan 22 17:09:38 2019 +0700 ---------------------------------------------------------------------- .../versions/CassandraSchemaVersionManager.java | 2 +- .../guice/protocols/webadmin-cassandra/pom.xml | 4 + .../modules/server/CassandraRoutesModule.java | 3 + server/data/data-cassandra/pom.xml | 5 + .../cassandra/CassandraMappingsSourcesDAO.java | 8 +- .../CassandraRecipientRewriteTable.java | 9 +- .../CassandraRecipientRewriteTableDAO.java | 45 ++---- .../migration/MappingsSourcesMigration.java | 65 ++++++++ .../CassandraMappingsSourcesDAOTest.java | 1 - .../CassandraRecipientRewriteTableDAOTest.java | 10 +- .../migration/MappingsSourcesMigrationTest.java | 151 +++++++++++++++++++ 11 files changed, 256 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java ---------------------------------------------------------------------- diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java index d953f7e..172fdb5 100644 --- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java +++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/versions/CassandraSchemaVersionManager.java @@ -34,7 +34,7 @@ import com.google.common.base.Preconditions; public class CassandraSchemaVersionManager { public static final SchemaVersion MIN_VERSION = new SchemaVersion(2); - public static final SchemaVersion MAX_VERSION = new SchemaVersion(6); + public static final SchemaVersion MAX_VERSION = new SchemaVersion(7); public static final SchemaVersion DEFAULT_VERSION = MIN_VERSION; private static final Logger LOGGER = LoggerFactory.getLogger(CassandraSchemaVersionManager.class); http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/container/guice/protocols/webadmin-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/server/container/guice/protocols/webadmin-cassandra/pom.xml b/server/container/guice/protocols/webadmin-cassandra/pom.xml index 6554da8..4f7fbcb 100644 --- a/server/container/guice/protocols/webadmin-cassandra/pom.xml +++ b/server/container/guice/protocols/webadmin-cassandra/pom.xml @@ -41,6 +41,10 @@ <artifactId>james-server-webadmin-cassandra</artifactId> </dependency> <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-data-cassandra</artifactId> + </dependency> + <dependency> <groupId>com.google.inject</groupId> <artifactId>guice</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java ---------------------------------------------------------------------- diff --git a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java index 7513df4..01e4caa 100644 --- a/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java +++ b/server/container/guice/protocols/webadmin-cassandra/src/main/java/org/apache/james/modules/server/CassandraRoutesModule.java @@ -26,6 +26,7 @@ import org.apache.james.backends.cassandra.versions.SchemaVersion; import org.apache.james.mailbox.cassandra.mail.migration.AttachmentMessageIdCreation; import org.apache.james.mailbox.cassandra.mail.migration.AttachmentV2Migration; import org.apache.james.mailbox.cassandra.mail.migration.MailboxPathV2Migration; +import org.apache.james.rrt.cassandra.migration.MappingsSourcesMigration; import org.apache.james.webadmin.Routes; import org.apache.james.webadmin.routes.CassandraMailboxMergingRoutes; import org.apache.james.webadmin.routes.CassandraMigrationRoutes; @@ -41,6 +42,7 @@ public class CassandraRoutesModule extends AbstractModule { private static final SchemaVersion FROM_V3_TO_V4 = new SchemaVersion(3); private static final SchemaVersion FROM_V4_TO_V5 = new SchemaVersion(4); private static final SchemaVersion FROM_V5_TO_V6 = new SchemaVersion(5); + private static final SchemaVersion FROM_V6_TO_V7 = new SchemaVersion(6); @Override protected void configure() { @@ -57,6 +59,7 @@ public class CassandraRoutesModule extends AbstractModule { allMigrationClazzBinder.addBinding(FROM_V3_TO_V4).to(AttachmentV2Migration.class); allMigrationClazzBinder.addBinding(FROM_V4_TO_V5).to(AttachmentMessageIdCreation.class); allMigrationClazzBinder.addBinding(FROM_V5_TO_V6).to(MailboxPathV2Migration.class); + allMigrationClazzBinder.addBinding(FROM_V6_TO_V7).to(MappingsSourcesMigration.class); bind(SchemaVersion.class) .annotatedWith(Names.named(CassandraMigrationService.LATEST_VERSION)) http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/pom.xml b/server/data/data-cassandra/pom.xml index 01780b5..bd3220a 100644 --- a/server/data/data-cassandra/pom.xml +++ b/server/data/data-cassandra/pom.xml @@ -150,6 +150,11 @@ <artifactId>testcontainers</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java index 777a50e..6ff3072 100644 --- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java +++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAO.java @@ -41,14 +41,14 @@ import com.datastax.driver.core.Session; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -class CassandraMappingsSourcesDAO { +public class CassandraMappingsSourcesDAO { private final CassandraAsyncExecutor executor; private final PreparedStatement insertStatement; private final PreparedStatement deleteStatement; private final PreparedStatement retrieveSourcesStatement; @Inject - CassandraMappingsSourcesDAO(Session session) { + public CassandraMappingsSourcesDAO(Session session) { this.executor = new CassandraAsyncExecutor(session); this.insertStatement = prepareInsertStatement(session); this.deleteStatement = prepareDelete(session); @@ -77,7 +77,7 @@ class CassandraMappingsSourcesDAO { .and(eq(MAPPING_VALUE, bindMarker(MAPPING_VALUE)))); } - Mono<Void> addMapping(Mapping mapping, MappingSource source) { + public Mono<Void> addMapping(Mapping mapping, MappingSource source) { return executor.executeVoidReactor(insertStatement.bind() .setString(MAPPING_TYPE, mapping.getType().asPrefix()) .setString(MAPPING_VALUE, mapping.getMappingValue()) @@ -91,7 +91,7 @@ class CassandraMappingsSourcesDAO { .setString(SOURCE, source.asMailAddressString())); } - Flux<MappingSource> retrieveSources(Mapping mapping) { + public Flux<MappingSource> retrieveSources(Mapping mapping) { return executor.executeReactor(retrieveSourcesStatement.bind() .setString(MAPPING_TYPE, mapping.getType().asPrefix()) .setString(MAPPING_VALUE, mapping.getMappingValue())) http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java index 4c7a92b..0a51ec1 100644 --- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java +++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTable.java @@ -30,6 +30,8 @@ import org.apache.james.rrt.lib.Mappings; import org.apache.james.rrt.lib.MappingsImpl; import org.apache.james.util.OptionalUtils; +import com.github.steveash.guavate.Guavate; + public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTable { private final CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO; private final CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO; @@ -63,7 +65,12 @@ public class CassandraRecipientRewriteTable extends AbstractRecipientRewriteTabl @Override public Map<MappingSource, Mappings> getAllMappings() { - return cassandraRecipientRewriteTableDAO.getAllMappings().block(); + return cassandraRecipientRewriteTableDAO.getAllMappings() + .collect(Guavate.toImmutableMap( + pair -> pair.getLeft(), + pair -> MappingsImpl.fromMappings(pair.getRight()), + Mappings::union)) + .block(); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java index 52f3516..fe31f1f 100644 --- a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java +++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAO.java @@ -29,24 +29,23 @@ import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTab import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.TABLE_NAME; import static org.apache.james.rrt.cassandra.tables.CassandraRecipientRewriteTableTable.USER; -import java.util.Map; - import javax.inject.Inject; +import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor; import org.apache.james.backends.cassandra.utils.CassandraUtils; import org.apache.james.rrt.lib.Mapping; import org.apache.james.rrt.lib.MappingSource; -import org.apache.james.rrt.lib.Mappings; import org.apache.james.rrt.lib.MappingsImpl; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; import com.github.steveash.guavate.Guavate; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -class CassandraRecipientRewriteTableDAO { +public class CassandraRecipientRewriteTableDAO { private final CassandraAsyncExecutor executor; private final CassandraUtils cassandraUtils; private final PreparedStatement insertStatement; @@ -55,7 +54,7 @@ class CassandraRecipientRewriteTableDAO { private final PreparedStatement retrieveAllMappingsStatement; @Inject - CassandraRecipientRewriteTableDAO(Session session, CassandraUtils cassandraUtils) { + public CassandraRecipientRewriteTableDAO(Session session, CassandraUtils cassandraUtils) { this.executor = new CassandraAsyncExecutor(session); this.cassandraUtils = cassandraUtils; this.insertStatement = prepareInsertStatement(session); @@ -91,7 +90,7 @@ class CassandraRecipientRewriteTableDAO { .value(MAPPING, bindMarker(MAPPING))); } - Mono<Void> addMapping(MappingSource source, Mapping mapping) { + public Mono<Void> addMapping(MappingSource source, Mapping mapping) { return executor.executeVoidReactor(insertStatement.bind() .setString(USER, source.getFixedUser()) .setString(DOMAIN, source.getFixedDomain()) @@ -116,35 +115,11 @@ class CassandraRecipientRewriteTableDAO { .filter(mappings -> !mappings.isEmpty()); } - Mono<Map<MappingSource, Mappings>> getAllMappings() { + public Flux<Pair<MappingSource, Mapping>> getAllMappings() { return executor.executeReactor(retrieveAllMappingsStatement.bind()) - .map(resultSet -> cassandraUtils.convertToStream(resultSet) - .map(row -> new UserMapping(MappingSource.fromUser(row.getString(USER), row.getString(DOMAIN)), row.getString(MAPPING))) - .collect(Guavate.toImmutableMap( - UserMapping::getSource, - UserMapping::toMapping, - Mappings::union))); - } - - private static class UserMapping { - private final MappingSource source; - private final String mapping; - - UserMapping(MappingSource source, String mapping) { - this.source = source; - this.mapping = mapping; - } - - MappingSource getSource() { - return source; - } - - String getMapping() { - return mapping; - } - - Mappings toMapping() { - return MappingsImpl.fromRawString(getMapping()); - } + .flatMapMany(Flux::fromIterable) + .map(row -> Pair.of( + MappingSource.fromUser(row.getString(USER), row.getString(DOMAIN)), + Mapping.of(row.getString(MAPPING)))); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java new file mode 100644 index 0000000..226add2 --- /dev/null +++ b/server/data/data-cassandra/src/main/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigration.java @@ -0,0 +1,65 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.rrt.cassandra.migration; + +import javax.inject.Inject; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.cassandra.migration.Migration; +import org.apache.james.rrt.cassandra.CassandraMappingsSourcesDAO; +import org.apache.james.rrt.cassandra.CassandraRecipientRewriteTableDAO; +import org.apache.james.rrt.lib.Mapping; +import org.apache.james.rrt.lib.MappingSource; +import org.apache.james.task.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Mono; + +public class MappingsSourcesMigration implements Migration { + private static final Logger LOGGER = LoggerFactory.getLogger(MappingsSourcesMigration.class); + private final CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO; + private final CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO; + + @Inject + public MappingsSourcesMigration(CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO, + CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO) { + this.cassandraRecipientRewriteTableDAO = cassandraRecipientRewriteTableDAO; + this.cassandraMappingsSourcesDAO = cassandraMappingsSourcesDAO; + } + + + @Override + public Result run() { + return cassandraRecipientRewriteTableDAO.getAllMappings() + .flatMap(this::migrate) + .reduce(Result.COMPLETED, Task::combine) + .doOnError(e -> LOGGER.error("Error while migrating mappings sources", e)) + .onErrorResume(e -> Mono.just(Result.PARTIAL)) + .block(); + } + + private Mono<Result> migrate(Pair<MappingSource, Mapping> mappingEntry) { + return cassandraMappingsSourcesDAO.addMapping(mappingEntry.getRight(), mappingEntry.getLeft()) + .map(any -> Result.COMPLETED) + .doOnError(e -> LOGGER.error("Error while performing migration of mappings sources", e)) + .onErrorResume(e -> Mono.just(Result.PARTIAL)); + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java index 5d0a125..b277a4e 100644 --- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java +++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraMappingsSourcesDAOTest.java @@ -23,7 +23,6 @@ import static org.assertj.core.api.Assertions.assertThat; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; -import org.apache.james.backends.cassandra.utils.CassandraUtils; import org.apache.james.core.Domain; import org.apache.james.rrt.lib.Mapping; import org.apache.james.rrt.lib.MappingSource; http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java index 3abc541..1654166 100644 --- a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java +++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/CassandraRecipientRewriteTableDAOTest.java @@ -58,7 +58,7 @@ class CassandraRecipientRewriteTableDAOTest { @Test void getAllMappingsShouldReturnEmptyByDefault() { - assertThat(dao.getAllMappings().block()).isEmpty(); + assertThat(dao.getAllMappings().collectList().block()).isEmpty(); } @Test @@ -72,7 +72,7 @@ class CassandraRecipientRewriteTableDAOTest { void getAllMappingsShouldReturnStoredMapping() { dao.addMapping(SOURCE, MAPPING).block(); - assertThat(dao.getAllMappings().block()).contains(Pair.of(SOURCE, MappingsImpl.fromMappings(MAPPING))); + assertThat(dao.getAllMappings().collectList().block()).contains(Pair.of(SOURCE, MAPPING)); } @Test @@ -90,7 +90,7 @@ class CassandraRecipientRewriteTableDAOTest { dao.removeMapping(SOURCE, MAPPING).block(); - assertThat(dao.getAllMappings().block()).isEmpty(); + assertThat(dao.getAllMappings().collectList().block()).isEmpty(); } @Test @@ -107,7 +107,7 @@ class CassandraRecipientRewriteTableDAOTest { dao.addMapping(SOURCE, MAPPING).block(); dao.addMapping(SOURCE, MAPPING_2).block(); - assertThat(dao.getAllMappings().block()) - .contains(Pair.of(SOURCE, MappingsImpl.fromMappings(MAPPING, MAPPING_2))); + assertThat(dao.getAllMappings().collectList().block()) + .contains(Pair.of(SOURCE, MAPPING), Pair.of(SOURCE, MAPPING_2)); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/d51c7085/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java ---------------------------------------------------------------------- diff --git a/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java new file mode 100644 index 0000000..d2c1c69 --- /dev/null +++ b/server/data/data-cassandra/src/test/java/org/apache/james/rrt/cassandra/migration/MappingsSourcesMigrationTest.java @@ -0,0 +1,151 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.rrt.cassandra.migration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.stream.IntStream; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.backends.cassandra.CassandraCluster; +import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.migration.Migration; +import org.apache.james.backends.cassandra.utils.CassandraUtils; +import org.apache.james.core.Domain; +import org.apache.james.rrt.cassandra.CassandraMappingsSourcesDAO; +import org.apache.james.rrt.cassandra.CassandraRRTModule; +import org.apache.james.rrt.cassandra.CassandraRecipientRewriteTableDAO; +import org.apache.james.rrt.lib.Mapping; +import org.apache.james.rrt.lib.MappingSource; +import org.apache.james.task.Task; +import org.apache.james.util.concurrency.ConcurrentTestRunner; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import reactor.core.publisher.Flux; + +class MappingsSourcesMigrationTest { + private static final int THREAD_COUNT = 10; + private static final int OPERATION_COUNT = 10; + private static final int MAPPING_COUNT = 100; + + private static final String USER = "test"; + private static final String ADDRESS = "test@domain"; + private static final MappingSource SOURCE = MappingSource.fromUser(USER, Domain.LOCALHOST); + private static final Mapping MAPPING = Mapping.alias(ADDRESS); + + @RegisterExtension + static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraRRTModule.MODULE); + + private CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO; + private CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO; + + private MappingsSourcesMigration migration; + + @BeforeEach + void setUp(CassandraCluster cassandra) { + cassandraRecipientRewriteTableDAO = new CassandraRecipientRewriteTableDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION); + cassandraMappingsSourcesDAO = new CassandraMappingsSourcesDAO(cassandra.getConf()); + + migration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, cassandraMappingsSourcesDAO); + } + + @Test + void emptyMigrationShouldSucceed() { + assertThat(migration.run()).isEqualTo(Migration.Result.COMPLETED); + } + + @Test + void migrationShouldSucceedWithData() { + cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block(); + + assertThat(migration.run()).isEqualTo(Task.Result.COMPLETED); + } + + @Test + void migrationShouldCreateMappingSourceFromMapping() { + cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block(); + + migration.run(); + + assertThat(cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block()) + .containsExactly(SOURCE); + } + + @Test + void migrationShouldCreateMultipleMappingSourcesFromMappings() { + MappingSource source2 = MappingSource.fromUser("bob", Domain.LOCALHOST); + + cassandraRecipientRewriteTableDAO.addMapping(SOURCE, MAPPING).block(); + cassandraRecipientRewriteTableDAO.addMapping(source2, MAPPING).block(); + + migration.run(); + + assertThat(cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block()) + .containsOnly(SOURCE, source2); + } + + @Test + void migrationShouldReturnPartialWhenGetAllMappingsFromMappingsFail() { + CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO = mock(CassandraRecipientRewriteTableDAO.class); + CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO = mock(CassandraMappingsSourcesDAO.class); + migration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, cassandraMappingsSourcesDAO); + + when(cassandraRecipientRewriteTableDAO.getAllMappings()).thenReturn(Flux.error(new RuntimeException())); + + assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL); + } + + @Test + void migrationShouldReturnPartialAddMappingFails() { + CassandraRecipientRewriteTableDAO cassandraRecipientRewriteTableDAO = mock(CassandraRecipientRewriteTableDAO.class); + CassandraMappingsSourcesDAO cassandraMappingsSourcesDAO = mock(CassandraMappingsSourcesDAO.class); + migration = new MappingsSourcesMigration(cassandraRecipientRewriteTableDAO, cassandraMappingsSourcesDAO); + + when(cassandraRecipientRewriteTableDAO.getAllMappings()) + .thenReturn(Flux.just(Pair.of(SOURCE, MAPPING))); + when(cassandraMappingsSourcesDAO.addMapping(any(Mapping.class), any(MappingSource.class))) + .thenThrow(new RuntimeException()); + + assertThat(migration.run()).isEqualTo(Migration.Result.PARTIAL); + } + + @Test + void migrationShouldBeIdempotentWhenRunMultipleTimes() throws ExecutionException, InterruptedException { + IntStream.range(0, MAPPING_COUNT) + .forEach(i -> cassandraRecipientRewriteTableDAO + .addMapping(MappingSource.parse("source" + i + "@domain"), MAPPING).block()); + + ConcurrentTestRunner.builder() + .operation((threadNumber, step) -> migration.run()) + .threadCount(THREAD_COUNT) + .operationCount(OPERATION_COUNT) + .runSuccessfullyWithin(Duration.ofMinutes(1)); + + assertThat(cassandraMappingsSourcesDAO.retrieveSources(MAPPING).collectList().block()) + .hasSize(MAPPING_COUNT); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org