JAMES-2082 Migration from V1 to V2 should be run asynchronously on separated thread
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/eb55762a Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/eb55762a Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/eb55762a Branch: refs/heads/master Commit: eb55762a3c91998c11720c1f8535913230641215 Parents: 17ed1d4 Author: benwa <btell...@linagora.com> Authored: Fri Jul 7 09:39:29 2017 +0700 Committer: Antoine Duprat <adup...@linagora.com> Committed: Mon Jul 10 14:23:57 2017 +0200 ---------------------------------------------------------------------- mailbox/cassandra/pom.xml | 6 + .../CassandraMailboxSessionMapperFactory.java | 12 +- .../mail/CassandraMessageIdMapper.java | 4 +- .../cassandra/mail/CassandraMessageMapper.java | 14 +-- .../mail/migration/V1ToV2Migration.java | 67 +++++------ .../mail/migration/V1ToV2MigrationThread.java | 118 +++++++++++++++++++ .../mail/migration/V1ToV2MigrationTest.java | 61 +++++++--- 7 files changed, 216 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/eb55762a/mailbox/cassandra/pom.xml ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/pom.xml b/mailbox/cassandra/pom.xml index 91bbd95..1e39812 100644 --- a/mailbox/cassandra/pom.xml +++ b/mailbox/cassandra/pom.xml @@ -260,6 +260,12 @@ <artifactId>throwing-lambdas</artifactId> <version>0.5.0</version> </dependency> + <dependency> + <groupId>com.jayway.awaitility</groupId> + <artifactId>awaitility</artifactId> + <version>1.6.5</version> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/james-project/blob/eb55762a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java index 4ddd2db..cabd043 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java @@ -42,6 +42,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMessageMapper; import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider; import org.apache.james.mailbox.cassandra.mail.CassandraUidProvider; import org.apache.james.mailbox.cassandra.mail.*; +import org.apache.james.mailbox.cassandra.mail.migration.V1ToV2Migration; import org.apache.james.mailbox.cassandra.user.CassandraSubscriptionMapper; import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.store.MailboxSessionMapperFactory; @@ -74,6 +75,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa private final CassandraMailboxPathDAO mailboxPathDAO; private final CassandraFirstUnseenDAO firstUnseenDAO; private final CassandraApplicableFlagDAO applicableFlagDAO; + private final V1ToV2Migration v1ToV2Migration; private CassandraUtils cassandraUtils; private CassandraConfiguration cassandraConfiguration; private final CassandraDeletedMessageDAO deletedMessageDAO; @@ -84,7 +86,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa CassandraMessageIdDAO messageIdDAO, CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMailboxCounterDAO mailboxCounterDAO, CassandraMailboxRecentsDAO mailboxRecentsDAO, CassandraMailboxDAO mailboxDAO, CassandraMailboxPathDAO mailboxPathDAO, CassandraFirstUnseenDAO firstUnseenDAO, CassandraApplicableFlagDAO applicableFlagDAO, - CassandraDeletedMessageDAO deletedMessageDAO, CassandraUtils cassandraUtils, CassandraConfiguration cassandraConfiguration) { + CassandraDeletedMessageDAO deletedMessageDAO, V1ToV2Migration v1ToV2Migration, CassandraUtils cassandraUtils, CassandraConfiguration cassandraConfiguration) { this.uidProvider = uidProvider; this.modSeqProvider = modSeqProvider; this.session = session; @@ -107,6 +109,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa firstUnseenDAO, applicableFlagDAO, deletedMessageDAO); + this.v1ToV2Migration = v1ToV2Migration; } public CassandraMailboxSessionMapperFactory( @@ -126,7 +129,8 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa CassandraDeletedMessageDAO deletedMesageDAO) { this(uidProvider, modSeqProvider, session, messageDAO, messageDAOV2, messageIdDAO, imapUidDAO, mailboxCounterDAO, - mailboxRecentsDAO, mailboxDAO, mailboxPathDAO, firstUnseenDAO, applicableFlagDAO, deletedMesageDAO, + mailboxRecentsDAO, mailboxDAO, mailboxPathDAO, firstUnseenDAO, applicableFlagDAO, deletedMesageDAO, + new V1ToV2Migration(messageDAO, messageDAOV2, new CassandraAttachmentMapper(session), CassandraConfiguration.DEFAULT_CONFIGURATION), CassandraUtils.WITH_DEFAULT_CONFIGURATION, CassandraConfiguration.DEFAULT_CONFIGURATION); } @@ -137,7 +141,6 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa modSeqProvider, null, (CassandraAttachmentMapper) createAttachmentMapper(mailboxSession), - messageDAO, messageDAOV2, messageIdDAO, imapUidDAO, @@ -147,6 +150,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa indexTableHandler, firstUnseenDAO, deletedMessageDAO, + v1ToV2Migration, cassandraConfiguration); } @@ -155,7 +159,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa return new CassandraMessageIdMapper(getMailboxMapper(mailboxSession), mailboxDAO, (CassandraAttachmentMapper) getAttachmentMapper(mailboxSession), imapUidDAO, messageIdDAO, messageDAO, messageDAOV2, indexTableHandler, modSeqProvider, mailboxSession, - cassandraConfiguration); + v1ToV2Migration, cassandraConfiguration); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/eb55762a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java index 479f43f..bf43a76 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java @@ -77,7 +77,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO, CassandraMessageDAO messageDAOV1, CassandraMessageDAOV2 messageDAOV2, CassandraIndexTableHandler indexTableHandler, ModSeqProvider modSeqProvider, MailboxSession mailboxSession, - CassandraConfiguration cassandraConfiguration) { + V1ToV2Migration v1ToV2Migration, CassandraConfiguration cassandraConfiguration) { this.mailboxMapper = mailboxMapper; this.mailboxDAO = mailboxDAO; @@ -89,7 +89,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper { this.mailboxSession = mailboxSession; this.attachmentLoader = new AttachmentLoader(attachmentMapper); this.cassandraConfiguration = cassandraConfiguration; - this.v1ToV2Migration = new V1ToV2Migration(messageDAOV1, messageDAOV2, attachmentMapper, cassandraConfiguration); + this.v1ToV2Migration = v1ToV2Migration; } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/eb55762a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 3ed7842..b34f03c 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -87,14 +87,14 @@ public class CassandraMessageMapper implements MessageMapper { private final V1ToV2Migration v1ToV2Migration; private final CassandraConfiguration cassandraConfiguration; - - public CassandraMessageMapper(CassandraUidProvider uidProvider, CassandraModSeqProvider modSeqProvider, MailboxSession mailboxSession, CassandraAttachmentMapper attachmentMapper, - CassandraMessageDAO messageDAO, CassandraMessageDAOV2 messageDAOV2, - CassandraMessageIdDAO messageIdDAO, CassandraMessageIdToImapUidDAO imapUidDAO, - CassandraMailboxCounterDAO mailboxCounterDAO, CassandraMailboxRecentsDAO mailboxRecentDAO, CassandraApplicableFlagDAO applicableFlagDAO, - CassandraIndexTableHandler indexTableHandler, CassandraFirstUnseenDAO firstUnseenDAO, CassandraDeletedMessageDAO deletedMessageDAO, CassandraConfiguration cassandraConfiguration) { + CassandraMessageDAOV2 messageDAOV2, CassandraMessageIdDAO messageIdDAO, + CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMailboxCounterDAO mailboxCounterDAO, + CassandraMailboxRecentsDAO mailboxRecentDAO, CassandraApplicableFlagDAO applicableFlagDAO, + CassandraIndexTableHandler indexTableHandler, CassandraFirstUnseenDAO firstUnseenDAO, + CassandraDeletedMessageDAO deletedMessageDAO, V1ToV2Migration v1ToV2Migration, + CassandraConfiguration cassandraConfiguration) { this.uidProvider = uidProvider; this.modSeqProvider = modSeqProvider; this.mailboxSession = mailboxSession; @@ -108,7 +108,7 @@ public class CassandraMessageMapper implements MessageMapper { this.attachmentLoader = new AttachmentLoader(attachmentMapper); this.applicableFlagDAO = applicableFlagDAO; this.deletedMessageDAO = deletedMessageDAO; - this.v1ToV2Migration = new V1ToV2Migration(messageDAO, messageDAOV2, attachmentMapper, cassandraConfiguration); + this.v1ToV2Migration = v1ToV2Migration; this.cassandraConfiguration = cassandraConfiguration; } http://git-wip-us.apache.org/repos/asf/james-project/blob/eb55762a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java index 0a4463e..131addb 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java @@ -19,13 +19,17 @@ package org.apache.james.mailbox.cassandra.mail.migration; -import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.stream.IntStream; import java.util.stream.Stream; +import javax.annotation.PreDestroy; +import javax.inject.Inject; + import org.apache.commons.lang3.tuple.Pair; import org.apache.james.backends.cassandra.CassandraConfiguration; -import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; import org.apache.james.mailbox.cassandra.mail.AttachmentLoader; import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMapper; import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO; @@ -33,28 +37,37 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2; import org.apache.james.mailbox.cassandra.mail.MessageAttachmentRepresentation; import org.apache.james.mailbox.cassandra.mail.MessageWithoutAttachment; import org.apache.james.mailbox.cassandra.mail.utils.Limit; -import org.apache.james.mailbox.exception.MailboxException; import org.apache.james.mailbox.store.mail.MessageMapper; -import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.collect.EvictingQueue; import com.google.common.collect.ImmutableList; public class V1ToV2Migration { - private static final Logger LOGGER = LoggerFactory.getLogger(V1ToV2Migration.class); + private static final int MIGRATION_THREAD_COUNT = 2; + private static final int MIGRATION_QUEUE_LENGTH = 1000; private final CassandraMessageDAO messageDAOV1; - private final CassandraMessageDAOV2 messageDAOV2; private final AttachmentLoader attachmentLoader; private final CassandraConfiguration cassandraConfiguration; + private final ExecutorService migrationExecutor; + private final EvictingQueue<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated; + @Inject public V1ToV2Migration(CassandraMessageDAO messageDAOV1, CassandraMessageDAOV2 messageDAOV2, CassandraAttachmentMapper attachmentMapper, CassandraConfiguration cassandraConfiguration) { this.messageDAOV1 = messageDAOV1; - this.messageDAOV2 = messageDAOV2; this.attachmentLoader = new AttachmentLoader(attachmentMapper); this.cassandraConfiguration = cassandraConfiguration; + this.migrationExecutor = Executors.newFixedThreadPool(MIGRATION_THREAD_COUNT); + this.messagesToBeMigrated = EvictingQueue.create(MIGRATION_QUEUE_LENGTH); + IntStream.range(0, MIGRATION_THREAD_COUNT) + .mapToObj(i -> new V1ToV2MigrationThread(messagesToBeMigrated, messageDAOV1, messageDAOV2, attachmentLoader)) + .forEach(migrationExecutor::execute); + } + + @PreDestroy + public void stop() { + migrationExecutor.shutdownNow(); } public CompletableFuture<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> @@ -67,37 +80,15 @@ public class V1ToV2Migration { return messageDAOV1.retrieveMessages(ImmutableList.of(result.getMetadata()), MessageMapper.FetchType.Full, Limit.unlimited()) .thenApply(results -> results.findAny() .orElseThrow(() -> new IllegalArgumentException("Message not found in DAO V1" + result.getMetadata()))) - .thenCompose(this::performV1ToV2Migration); - } - - private CompletableFuture<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> performV1ToV2Migration(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> messageV1) { - return attachmentLoader.addAttachmentToMessages(Stream.of(messageV1), MessageMapper.FetchType.Full) - .thenApply(stream -> stream.findAny().get()) - .thenCompose(this::performV1ToV2Migration) - .thenApply(any -> messageV1); - } - - private CompletableFuture<Void> performV1ToV2Migration(SimpleMailboxMessage message) { - if (!cassandraConfiguration.isOnTheFlyV1ToV2Migration()) { - return CompletableFuture.completedFuture(null); - } - return saveInV2FromV1(message) - .thenCompose(this::deleteInV1); - } - - private CompletableFuture<Void> deleteInV1(Optional<SimpleMailboxMessage> optional) { - return optional.map(SimpleMailboxMessage::getMessageId) - .map(messageId -> (CassandraMessageId) messageId) - .map(messageDAOV1::delete) - .orElse(CompletableFuture.completedFuture(null)); + .thenApply(this::submitMigration); } - private CompletableFuture<Optional<SimpleMailboxMessage>> saveInV2FromV1(SimpleMailboxMessage message) { - try { - return messageDAOV2.save(message).thenApply(any -> Optional.of(message)); - } catch (MailboxException e) { - LOGGER.error("Exception while saving message during migration", e); - return CompletableFuture.completedFuture(Optional.<SimpleMailboxMessage>empty()); + private Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> submitMigration(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> messageV1) { + if (cassandraConfiguration.isOnTheFlyV1ToV2Migration()) { + synchronized (messagesToBeMigrated) { + messagesToBeMigrated.add(messageV1); + } } + return messageV1; } } http://git-wip-us.apache.org/repos/asf/james-project/blob/eb55762a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java new file mode 100644 index 0000000..acac3a3 --- /dev/null +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java @@ -0,0 +1,118 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.mailbox.cassandra.mail.migration; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; +import org.apache.james.mailbox.cassandra.mail.AttachmentLoader; +import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO; +import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2; +import org.apache.james.mailbox.cassandra.mail.MessageAttachmentRepresentation; +import org.apache.james.mailbox.cassandra.mail.MessageWithoutAttachment; +import org.apache.james.mailbox.exception.MailboxException; +import org.apache.james.mailbox.store.mail.MessageMapper; +import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Throwables; +import com.google.common.collect.EvictingQueue; + +public class V1ToV2MigrationThread implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(V1ToV2MigrationThread.class); + public static final int POLL_INTERVAL_IN_MS = 10; + + private final EvictingQueue<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated; + private final CassandraMessageDAO messageDAOV1; + private final CassandraMessageDAOV2 messageDAOV2; + private final AttachmentLoader attachmentLoader; + + public V1ToV2MigrationThread(EvictingQueue<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated, + CassandraMessageDAO messageDAOV1, CassandraMessageDAOV2 messageDAOV2, AttachmentLoader attachmentLoader) { + this.messagesToBeMigrated = messagesToBeMigrated; + this.messageDAOV1 = messageDAOV1; + this.messageDAOV2 = messageDAOV2; + this.attachmentLoader = attachmentLoader; + } + + @Override + public void run() { + while (true) { + try { + Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> message = dequeue(); + performV1ToV2Migration(message).join(); + } catch (Exception e) { + LOGGER.error("Error occured in migration thread", e); + } + } + } + + private Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> dequeue() { + while (true) { + Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> poll = poll(); + if (poll.isPresent()) { + return poll.get(); + } + try { + Thread.sleep(POLL_INTERVAL_IN_MS); + } catch (InterruptedException e) { + Throwables.propagate(e); + } + } + } + + private Optional<Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>>> poll() { + synchronized (messagesToBeMigrated) { + return Optional.ofNullable(messagesToBeMigrated.poll()); + } + } + + private CompletableFuture<Void> performV1ToV2Migration(Pair<MessageWithoutAttachment, Stream<MessageAttachmentRepresentation>> messageV1) { + return attachmentLoader.addAttachmentToMessages(Stream.of(messageV1), MessageMapper.FetchType.Full) + .thenApply(stream -> stream.findAny().get()) + .thenCompose(this::performV1ToV2Migration); + } + + private CompletableFuture<Void> performV1ToV2Migration(SimpleMailboxMessage message) { + return saveInV2FromV1(message) + .thenCompose(this::deleteInV1); + } + + private CompletableFuture<Void> deleteInV1(Optional<SimpleMailboxMessage> optional) { + return optional.map(SimpleMailboxMessage::getMessageId) + .map(messageId -> (CassandraMessageId) messageId) + .map(messageDAOV1::delete) + .orElse(CompletableFuture.completedFuture(null)); + } + + private CompletableFuture<Optional<SimpleMailboxMessage>> saveInV2FromV1(SimpleMailboxMessage message) { + try { + return messageDAOV2.save(message).thenApply(any -> Optional.of(message)); + } catch (MailboxException e) { + LOGGER.error("Exception while saving message during migration", e); + return CompletableFuture.completedFuture(Optional.<SimpleMailboxMessage>empty()); + } + } +} http://git-wip-us.apache.org/repos/asf/james-project/blob/eb55762a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java index 96b2f95..57ec014 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java @@ -22,6 +22,8 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.Date; import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; import javax.mail.Flags; import javax.mail.util.SharedByteArrayInputStream; @@ -33,12 +35,12 @@ import org.apache.james.backends.cassandra.init.CassandraModuleComposite; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.cassandra.ids.CassandraId; import org.apache.james.mailbox.cassandra.ids.CassandraMessageId; -import org.apache.james.mailbox.cassandra.mail.utils.Limit; import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMapper; import org.apache.james.mailbox.cassandra.mail.CassandraBlobsDAO; import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO; import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2; import org.apache.james.mailbox.cassandra.mail.MessageAttachmentRepresentation; +import org.apache.james.mailbox.cassandra.mail.utils.Limit; import org.apache.james.mailbox.cassandra.modules.CassandraAttachmentModule; import org.apache.james.mailbox.cassandra.modules.CassandraBlobModule; import org.apache.james.mailbox.cassandra.modules.CassandraMessageModule; @@ -61,6 +63,9 @@ import org.junit.Test; import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; +import com.jayway.awaitility.Awaitility; +import com.jayway.awaitility.Duration; +import com.jayway.awaitility.core.ConditionFactory; public class V1ToV2MigrationTest { private static final int BODY_START = 16; @@ -85,6 +90,7 @@ public class V1ToV2MigrationTest { @Rule public final JUnitSoftAssertions softly = new JUnitSoftAssertions(); + private ConditionFactory awaitability; @Before public void setUp() { @@ -126,10 +132,18 @@ public class V1ToV2MigrationTest { .isInline(true) .name("toto.png") .build(); + + Duration slowPacedPollInterval = Duration.FIVE_HUNDRED_MILLISECONDS; + awaitability = Awaitility + .with() + .pollInterval(slowPacedPollInterval) + .and() + .pollDelay(slowPacedPollInterval).await(); } @After public void tearDown() { + testee.stop(); cassandra.clearAllTables(); cassandra.close(); } @@ -142,15 +156,13 @@ public class V1ToV2MigrationTest { testee.getFromV2orElseFromV1AfterMigration(CassandraMessageDAOV2.notFound(metaData)).join(); - Optional<CassandraMessageDAOV2.MessageResult> messageResult = messageDAOV2.retrieveMessages(metaDataList, MessageMapper.FetchType.Full, Limit.unlimited()) - .join() - .findAny(); + awaitMigration(); - assertThat(messageResult.isPresent()).isTrue(); - softly.assertThat(messageResult.get().message().getLeft().getMessageId()).isEqualTo(messageId); - softly.assertThat(IOUtils.toString(messageResult.get().message().getLeft().getContent(), Charsets.UTF_8)) + CassandraMessageDAOV2.MessageResult messageResult = retrieveMessageOnV2().get(); + softly.assertThat(messageResult.message().getLeft().getMessageId()).isEqualTo(messageId); + softly.assertThat(IOUtils.toString(messageResult.message().getLeft().getContent(), Charsets.UTF_8)) .isEqualTo(CONTENT); - softly.assertThat(messageResult.get().message().getRight().findAny().isPresent()).isFalse(); + softly.assertThat(messageResult.message().getRight().findAny().isPresent()).isFalse(); } @Test @@ -164,15 +176,13 @@ public class V1ToV2MigrationTest { testee.getFromV2orElseFromV1AfterMigration(CassandraMessageDAOV2.notFound(metaData)).join(); - Optional<CassandraMessageDAOV2.MessageResult> messageResult = messageDAOV2.retrieveMessages(metaDataList, MessageMapper.FetchType.Full, Limit.unlimited()) - .join() - .findAny(); + awaitMigration(); - assertThat(messageResult.isPresent()).isTrue(); - softly.assertThat(messageResult.get().message().getLeft().getMessageId()).isEqualTo(messageId); - softly.assertThat(IOUtils.toString(messageResult.get().message().getLeft().getContent(), Charsets.UTF_8)) + CassandraMessageDAOV2.MessageResult messageResult = retrieveMessageOnV2().get(); + softly.assertThat(messageResult.message().getLeft().getMessageId()).isEqualTo(messageId); + softly.assertThat(IOUtils.toString(messageResult.message().getLeft().getContent(), Charsets.UTF_8)) .isEqualTo(CONTENT); - softly.assertThat(messageResult.get().message().getRight().findAny().get()).isEqualTo(MessageAttachmentRepresentation.builder() + softly.assertThat(messageResult.message().getRight().findAny().get()).isEqualTo(MessageAttachmentRepresentation.builder() .attachmentId(attachment.getAttachmentId()) .cid(OptionalConverter.fromGuava(messageAttachment.getCid())) .isInline(messageAttachment.isInline()) @@ -180,6 +190,27 @@ public class V1ToV2MigrationTest { .build()); } + private void awaitMigration() { + awaitability.atMost(1, TimeUnit.MINUTES) + .until(() -> { + try { + retrieveMessageOnV2(); + return true; + } catch(AssertionError e) { + return false; + } + }); + } + + private Optional<CassandraMessageDAOV2.MessageResult> retrieveMessageOnV2() { + Optional<CassandraMessageDAOV2.MessageResult> messageResult = messageDAOV2.retrieveMessages(metaDataList, MessageMapper.FetchType.Full, Limit.unlimited()) + .join() + .findAny(); + + assertThat(messageResult.isPresent()).isTrue(); + return messageResult; + } + private SimpleMailboxMessage createMessage(MessageId messageId, String content, int bodyStart, PropertyBuilder propertyBuilder, List<MessageAttachment> attachments) { return new SimpleMailboxMessage(messageId, new Date(), content.length(), bodyStart, new SharedByteArrayInputStream(content.getBytes()), new Flags(), propertyBuilder, MAILBOX_ID, attachments); } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org