JAMES-2082 Migration from V1 to V2 should be run asynchronously on separated 
thread


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/eb55762a
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/eb55762a
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/eb55762a

Branch: refs/heads/master
Commit: eb55762a3c91998c11720c1f8535913230641215
Parents: 17ed1d4
Author: benwa <btell...@linagora.com>
Authored: Fri Jul 7 09:39:29 2017 +0700
Committer: Antoine Duprat <adup...@linagora.com>
Committed: Mon Jul 10 14:23:57 2017 +0200

----------------------------------------------------------------------
 mailbox/cassandra/pom.xml                       |   6 +
 .../CassandraMailboxSessionMapperFactory.java   |  12 +-
 .../mail/CassandraMessageIdMapper.java          |   4 +-
 .../cassandra/mail/CassandraMessageMapper.java  |  14 +--
 .../mail/migration/V1ToV2Migration.java         |  67 +++++------
 .../mail/migration/V1ToV2MigrationThread.java   | 118 +++++++++++++++++++
 .../mail/migration/V1ToV2MigrationTest.java     |  61 +++++++---
 7 files changed, 216 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/eb55762a/mailbox/cassandra/pom.xml
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/pom.xml b/mailbox/cassandra/pom.xml
index 91bbd95..1e39812 100644
--- a/mailbox/cassandra/pom.xml
+++ b/mailbox/cassandra/pom.xml
@@ -260,6 +260,12 @@
                     <artifactId>throwing-lambdas</artifactId>
                     <version>0.5.0</version>
                 </dependency>
+                <dependency>
+                    <groupId>com.jayway.awaitility</groupId>
+                    <artifactId>awaitility</artifactId>
+                    <version>1.6.5</version>
+                    <scope>test</scope>
+                </dependency>
             </dependencies>
             <build>
                 <plugins>

http://git-wip-us.apache.org/repos/asf/james-project/blob/eb55762a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 4ddd2db..cabd043 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -42,6 +42,7 @@ import 
org.apache.james.mailbox.cassandra.mail.CassandraMessageMapper;
 import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
 import org.apache.james.mailbox.cassandra.mail.CassandraUidProvider;
 import org.apache.james.mailbox.cassandra.mail.*;
+import org.apache.james.mailbox.cassandra.mail.migration.V1ToV2Migration;
 import org.apache.james.mailbox.cassandra.user.CassandraSubscriptionMapper;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
@@ -74,6 +75,7 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
     private final CassandraMailboxPathDAO mailboxPathDAO;
     private final CassandraFirstUnseenDAO firstUnseenDAO;
     private final CassandraApplicableFlagDAO applicableFlagDAO;
+    private final V1ToV2Migration v1ToV2Migration;
     private CassandraUtils cassandraUtils;
     private CassandraConfiguration cassandraConfiguration;
     private final CassandraDeletedMessageDAO deletedMessageDAO;
@@ -84,7 +86,7 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
                                                 CassandraMessageIdDAO 
messageIdDAO, CassandraMessageIdToImapUidDAO imapUidDAO,
                                                 CassandraMailboxCounterDAO 
mailboxCounterDAO, CassandraMailboxRecentsDAO mailboxRecentsDAO, 
CassandraMailboxDAO mailboxDAO,
                                                 CassandraMailboxPathDAO 
mailboxPathDAO, CassandraFirstUnseenDAO firstUnseenDAO, 
CassandraApplicableFlagDAO applicableFlagDAO,
-                                                CassandraDeletedMessageDAO 
deletedMessageDAO, CassandraUtils cassandraUtils, CassandraConfiguration 
cassandraConfiguration) {
+                                                CassandraDeletedMessageDAO 
deletedMessageDAO, V1ToV2Migration v1ToV2Migration, CassandraUtils 
cassandraUtils, CassandraConfiguration cassandraConfiguration) {
         this.uidProvider = uidProvider;
         this.modSeqProvider = modSeqProvider;
         this.session = session;
@@ -107,6 +109,7 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
             firstUnseenDAO,
             applicableFlagDAO,
             deletedMessageDAO);
+        this.v1ToV2Migration = v1ToV2Migration;
     }
 
     public CassandraMailboxSessionMapperFactory(
@@ -126,7 +129,8 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
         CassandraDeletedMessageDAO deletedMesageDAO) {
 
         this(uidProvider, modSeqProvider, session, messageDAO, messageDAOV2, 
messageIdDAO, imapUidDAO, mailboxCounterDAO,
-             mailboxRecentsDAO, mailboxDAO, mailboxPathDAO, firstUnseenDAO, 
applicableFlagDAO, deletedMesageDAO,
+            mailboxRecentsDAO, mailboxDAO, mailboxPathDAO, firstUnseenDAO, 
applicableFlagDAO, deletedMesageDAO,
+            new V1ToV2Migration(messageDAO, messageDAOV2, new 
CassandraAttachmentMapper(session), 
CassandraConfiguration.DEFAULT_CONFIGURATION),
             CassandraUtils.WITH_DEFAULT_CONFIGURATION, 
CassandraConfiguration.DEFAULT_CONFIGURATION);
     }
 
@@ -137,7 +141,6 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
                                           modSeqProvider,
                                           null,
                                           (CassandraAttachmentMapper) 
createAttachmentMapper(mailboxSession),
-                                          messageDAO,
                                           messageDAOV2,
                                           messageIdDAO,
                                           imapUidDAO,
@@ -147,6 +150,7 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
                                           indexTableHandler,
                                           firstUnseenDAO,
                                           deletedMessageDAO,
+                                          v1ToV2Migration,
                                           cassandraConfiguration);
     }
 
@@ -155,7 +159,7 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
         return new CassandraMessageIdMapper(getMailboxMapper(mailboxSession), 
mailboxDAO,
                 (CassandraAttachmentMapper) 
getAttachmentMapper(mailboxSession),
                 imapUidDAO, messageIdDAO, messageDAO, messageDAOV2, 
indexTableHandler, modSeqProvider, mailboxSession,
-                cassandraConfiguration);
+                v1ToV2Migration, cassandraConfiguration);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/eb55762a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 479f43f..bf43a76 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -77,7 +77,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
                                     CassandraMessageIdToImapUidDAO imapUidDAO, 
CassandraMessageIdDAO messageIdDAO,
                                     CassandraMessageDAO messageDAOV1, 
CassandraMessageDAOV2 messageDAOV2,
                                     CassandraIndexTableHandler 
indexTableHandler, ModSeqProvider modSeqProvider, MailboxSession mailboxSession,
-                                    CassandraConfiguration 
cassandraConfiguration) {
+                                    V1ToV2Migration v1ToV2Migration, 
CassandraConfiguration cassandraConfiguration) {
 
         this.mailboxMapper = mailboxMapper;
         this.mailboxDAO = mailboxDAO;
@@ -89,7 +89,7 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
         this.mailboxSession = mailboxSession;
         this.attachmentLoader = new AttachmentLoader(attachmentMapper);
         this.cassandraConfiguration = cassandraConfiguration;
-        this.v1ToV2Migration = new V1ToV2Migration(messageDAOV1, messageDAOV2, 
attachmentMapper, cassandraConfiguration);
+        this.v1ToV2Migration = v1ToV2Migration;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/eb55762a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 3ed7842..b34f03c 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -87,14 +87,14 @@ public class CassandraMessageMapper implements 
MessageMapper {
     private final V1ToV2Migration v1ToV2Migration;
     private final CassandraConfiguration cassandraConfiguration;
 
-
-
     public CassandraMessageMapper(CassandraUidProvider uidProvider, 
CassandraModSeqProvider modSeqProvider,
                                   MailboxSession mailboxSession, 
CassandraAttachmentMapper attachmentMapper,
-                                  CassandraMessageDAO messageDAO, 
CassandraMessageDAOV2 messageDAOV2,
-                                  CassandraMessageIdDAO messageIdDAO, 
CassandraMessageIdToImapUidDAO imapUidDAO,
-                                  CassandraMailboxCounterDAO 
mailboxCounterDAO, CassandraMailboxRecentsDAO mailboxRecentDAO, 
CassandraApplicableFlagDAO applicableFlagDAO,
-                                  CassandraIndexTableHandler 
indexTableHandler, CassandraFirstUnseenDAO firstUnseenDAO, 
CassandraDeletedMessageDAO deletedMessageDAO, CassandraConfiguration 
cassandraConfiguration) {
+                                  CassandraMessageDAOV2 messageDAOV2, 
CassandraMessageIdDAO messageIdDAO,
+                                  CassandraMessageIdToImapUidDAO imapUidDAO, 
CassandraMailboxCounterDAO mailboxCounterDAO,
+                                  CassandraMailboxRecentsDAO mailboxRecentDAO, 
CassandraApplicableFlagDAO applicableFlagDAO,
+                                  CassandraIndexTableHandler 
indexTableHandler, CassandraFirstUnseenDAO firstUnseenDAO,
+                                  CassandraDeletedMessageDAO 
deletedMessageDAO, V1ToV2Migration v1ToV2Migration,
+                                  CassandraConfiguration 
cassandraConfiguration) {
         this.uidProvider = uidProvider;
         this.modSeqProvider = modSeqProvider;
         this.mailboxSession = mailboxSession;
@@ -108,7 +108,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
         this.attachmentLoader = new AttachmentLoader(attachmentMapper);
         this.applicableFlagDAO = applicableFlagDAO;
         this.deletedMessageDAO = deletedMessageDAO;
-        this.v1ToV2Migration = new V1ToV2Migration(messageDAO, messageDAOV2, 
attachmentMapper, cassandraConfiguration);
+        this.v1ToV2Migration = v1ToV2Migration;
         this.cassandraConfiguration = cassandraConfiguration;
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/eb55762a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
index 0a4463e..131addb 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2Migration.java
@@ -19,13 +19,17 @@
 
 package org.apache.james.mailbox.cassandra.mail.migration;
 
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.CassandraConfiguration;
-import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
 import org.apache.james.mailbox.cassandra.mail.AttachmentLoader;
 import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMapper;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
@@ -33,28 +37,37 @@ import 
org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
 import org.apache.james.mailbox.cassandra.mail.MessageAttachmentRepresentation;
 import org.apache.james.mailbox.cassandra.mail.MessageWithoutAttachment;
 import org.apache.james.mailbox.cassandra.mail.utils.Limit;
-import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.store.mail.MessageMapper;
-import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.EvictingQueue;
 import com.google.common.collect.ImmutableList;
 
 public class V1ToV2Migration {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(V1ToV2Migration.class);
+    private static final int MIGRATION_THREAD_COUNT = 2;
+    private static final int MIGRATION_QUEUE_LENGTH = 1000;
 
     private final CassandraMessageDAO messageDAOV1;
-    private final CassandraMessageDAOV2 messageDAOV2;
     private final AttachmentLoader attachmentLoader;
     private final CassandraConfiguration cassandraConfiguration;
+    private final ExecutorService migrationExecutor;
+    private final EvictingQueue<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated;
 
+    @Inject
     public V1ToV2Migration(CassandraMessageDAO messageDAOV1, 
CassandraMessageDAOV2 messageDAOV2,
                            CassandraAttachmentMapper attachmentMapper, 
CassandraConfiguration cassandraConfiguration) {
         this.messageDAOV1 = messageDAOV1;
-        this.messageDAOV2 = messageDAOV2;
         this.attachmentLoader = new AttachmentLoader(attachmentMapper);
         this.cassandraConfiguration = cassandraConfiguration;
+        this.migrationExecutor = 
Executors.newFixedThreadPool(MIGRATION_THREAD_COUNT);
+        this.messagesToBeMigrated = 
EvictingQueue.create(MIGRATION_QUEUE_LENGTH);
+        IntStream.range(0, MIGRATION_THREAD_COUNT)
+            .mapToObj(i -> new V1ToV2MigrationThread(messagesToBeMigrated, 
messageDAOV1, messageDAOV2, attachmentLoader))
+            .forEach(migrationExecutor::execute);
+    }
+
+    @PreDestroy
+    public void stop() {
+        migrationExecutor.shutdownNow();
     }
 
     public CompletableFuture<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>>
@@ -67,37 +80,15 @@ public class V1ToV2Migration {
         return 
messageDAOV1.retrieveMessages(ImmutableList.of(result.getMetadata()), 
MessageMapper.FetchType.Full, Limit.unlimited())
             .thenApply(results -> results.findAny()
                 .orElseThrow(() -> new IllegalArgumentException("Message not 
found in DAO V1" + result.getMetadata())))
-            .thenCompose(this::performV1ToV2Migration);
-    }
-
-    private CompletableFuture<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>> 
performV1ToV2Migration(Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>> messageV1) {
-        return attachmentLoader.addAttachmentToMessages(Stream.of(messageV1), 
MessageMapper.FetchType.Full)
-            .thenApply(stream -> stream.findAny().get())
-            .thenCompose(this::performV1ToV2Migration)
-            .thenApply(any -> messageV1);
-    }
-
-    private CompletableFuture<Void> 
performV1ToV2Migration(SimpleMailboxMessage message) {
-        if (!cassandraConfiguration.isOnTheFlyV1ToV2Migration()) {
-            return CompletableFuture.completedFuture(null);
-        }
-        return saveInV2FromV1(message)
-            .thenCompose(this::deleteInV1);
-    }
-
-    private CompletableFuture<Void> deleteInV1(Optional<SimpleMailboxMessage> 
optional) {
-        return optional.map(SimpleMailboxMessage::getMessageId)
-            .map(messageId -> (CassandraMessageId) messageId)
-            .map(messageDAOV1::delete)
-            .orElse(CompletableFuture.completedFuture(null));
+            .thenApply(this::submitMigration);
     }
 
-    private CompletableFuture<Optional<SimpleMailboxMessage>> 
saveInV2FromV1(SimpleMailboxMessage message) {
-        try {
-            return messageDAOV2.save(message).thenApply(any -> 
Optional.of(message));
-        } catch (MailboxException e) {
-            LOGGER.error("Exception while saving message during migration", e);
-            return 
CompletableFuture.completedFuture(Optional.<SimpleMailboxMessage>empty());
+    private Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>> 
submitMigration(Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>> messageV1) {
+        if (cassandraConfiguration.isOnTheFlyV1ToV2Migration()) {
+            synchronized (messagesToBeMigrated) {
+                messagesToBeMigrated.add(messageV1);
+            }
         }
+        return messageV1;
     }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/eb55762a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java
new file mode 100644
index 0000000..acac3a3
--- /dev/null
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationThread.java
@@ -0,0 +1,118 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.mailbox.cassandra.mail.migration;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.mail.AttachmentLoader;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
+import org.apache.james.mailbox.cassandra.mail.MessageAttachmentRepresentation;
+import org.apache.james.mailbox.cassandra.mail.MessageWithoutAttachment;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.EvictingQueue;
+
+public class V1ToV2MigrationThread implements Runnable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(V1ToV2MigrationThread.class);
+    public static final int POLL_INTERVAL_IN_MS = 10;
+
+    private final EvictingQueue<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated;
+    private final CassandraMessageDAO messageDAOV1;
+    private final CassandraMessageDAOV2 messageDAOV2;
+    private final AttachmentLoader attachmentLoader;
+
+    public V1ToV2MigrationThread(EvictingQueue<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>> messagesToBeMigrated,
+                                 CassandraMessageDAO messageDAOV1, 
CassandraMessageDAOV2 messageDAOV2, AttachmentLoader attachmentLoader) {
+        this.messagesToBeMigrated = messagesToBeMigrated;
+        this.messageDAOV1 = messageDAOV1;
+        this.messageDAOV2 = messageDAOV2;
+        this.attachmentLoader = attachmentLoader;
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            try {
+                Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>> message = dequeue();
+                performV1ToV2Migration(message).join();
+            } catch (Exception e) {
+                LOGGER.error("Error occured in migration thread", e);
+            }
+        }
+    }
+
+    private Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>> dequeue() {
+        while (true) {
+            Optional<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>> poll = poll();
+            if (poll.isPresent()) {
+                return poll.get();
+            }
+            try {
+                Thread.sleep(POLL_INTERVAL_IN_MS);
+            } catch (InterruptedException e) {
+                Throwables.propagate(e);
+            }
+        }
+    }
+
+    private Optional<Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>>> poll() {
+        synchronized (messagesToBeMigrated) {
+            return Optional.ofNullable(messagesToBeMigrated.poll());
+        }
+    }
+
+    private CompletableFuture<Void> 
performV1ToV2Migration(Pair<MessageWithoutAttachment, 
Stream<MessageAttachmentRepresentation>> messageV1) {
+        return attachmentLoader.addAttachmentToMessages(Stream.of(messageV1), 
MessageMapper.FetchType.Full)
+            .thenApply(stream -> stream.findAny().get())
+            .thenCompose(this::performV1ToV2Migration);
+    }
+
+    private CompletableFuture<Void> 
performV1ToV2Migration(SimpleMailboxMessage message) {
+        return saveInV2FromV1(message)
+            .thenCompose(this::deleteInV1);
+    }
+
+    private CompletableFuture<Void> deleteInV1(Optional<SimpleMailboxMessage> 
optional) {
+        return optional.map(SimpleMailboxMessage::getMessageId)
+            .map(messageId -> (CassandraMessageId) messageId)
+            .map(messageDAOV1::delete)
+            .orElse(CompletableFuture.completedFuture(null));
+    }
+
+    private CompletableFuture<Optional<SimpleMailboxMessage>> 
saveInV2FromV1(SimpleMailboxMessage message) {
+        try {
+            return messageDAOV2.save(message).thenApply(any -> 
Optional.of(message));
+        } catch (MailboxException e) {
+            LOGGER.error("Exception while saving message during migration", e);
+            return 
CompletableFuture.completedFuture(Optional.<SimpleMailboxMessage>empty());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/eb55762a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
index 96b2f95..57ec014 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/V1ToV2MigrationTest.java
@@ -22,6 +22,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.Date;
 import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 
 import javax.mail.Flags;
 import javax.mail.util.SharedByteArrayInputStream;
@@ -33,12 +35,12 @@ import 
org.apache.james.backends.cassandra.init.CassandraModuleComposite;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
-import org.apache.james.mailbox.cassandra.mail.utils.Limit;
 import org.apache.james.mailbox.cassandra.mail.CassandraAttachmentMapper;
 import org.apache.james.mailbox.cassandra.mail.CassandraBlobsDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraMessageDAOV2;
 import org.apache.james.mailbox.cassandra.mail.MessageAttachmentRepresentation;
+import org.apache.james.mailbox.cassandra.mail.utils.Limit;
 import org.apache.james.mailbox.cassandra.modules.CassandraAttachmentModule;
 import org.apache.james.mailbox.cassandra.modules.CassandraBlobModule;
 import org.apache.james.mailbox.cassandra.modules.CassandraMessageModule;
@@ -61,6 +63,9 @@ import org.junit.Test;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableList;
+import com.jayway.awaitility.Awaitility;
+import com.jayway.awaitility.Duration;
+import com.jayway.awaitility.core.ConditionFactory;
 
 public class V1ToV2MigrationTest {
     private static final int BODY_START = 16;
@@ -85,6 +90,7 @@ public class V1ToV2MigrationTest {
     
     @Rule
     public final JUnitSoftAssertions softly = new JUnitSoftAssertions();
+    private ConditionFactory awaitability;
 
     @Before
     public void setUp() {
@@ -126,10 +132,18 @@ public class V1ToV2MigrationTest {
             .isInline(true)
             .name("toto.png")
             .build();
+
+        Duration slowPacedPollInterval = Duration.FIVE_HUNDRED_MILLISECONDS;
+        awaitability = Awaitility
+            .with()
+            .pollInterval(slowPacedPollInterval)
+            .and()
+            .pollDelay(slowPacedPollInterval).await();
     }
 
     @After
     public void tearDown() {
+        testee.stop();
         cassandra.clearAllTables();
         cassandra.close();
     }
@@ -142,15 +156,13 @@ public class V1ToV2MigrationTest {
 
         
testee.getFromV2orElseFromV1AfterMigration(CassandraMessageDAOV2.notFound(metaData)).join();
 
-        Optional<CassandraMessageDAOV2.MessageResult> messageResult = 
messageDAOV2.retrieveMessages(metaDataList, MessageMapper.FetchType.Full, 
Limit.unlimited())
-            .join()
-            .findAny();
+        awaitMigration();
 
-        assertThat(messageResult.isPresent()).isTrue();
-        
softly.assertThat(messageResult.get().message().getLeft().getMessageId()).isEqualTo(messageId);
-        
softly.assertThat(IOUtils.toString(messageResult.get().message().getLeft().getContent(),
 Charsets.UTF_8))
+        CassandraMessageDAOV2.MessageResult messageResult = 
retrieveMessageOnV2().get();
+        
softly.assertThat(messageResult.message().getLeft().getMessageId()).isEqualTo(messageId);
+        
softly.assertThat(IOUtils.toString(messageResult.message().getLeft().getContent(),
 Charsets.UTF_8))
             .isEqualTo(CONTENT);
-        
softly.assertThat(messageResult.get().message().getRight().findAny().isPresent()).isFalse();
+        
softly.assertThat(messageResult.message().getRight().findAny().isPresent()).isFalse();
     }
 
     @Test
@@ -164,15 +176,13 @@ public class V1ToV2MigrationTest {
 
         
testee.getFromV2orElseFromV1AfterMigration(CassandraMessageDAOV2.notFound(metaData)).join();
 
-        Optional<CassandraMessageDAOV2.MessageResult> messageResult = 
messageDAOV2.retrieveMessages(metaDataList, MessageMapper.FetchType.Full, 
Limit.unlimited())
-            .join()
-            .findAny();
+        awaitMigration();
 
-        assertThat(messageResult.isPresent()).isTrue();
-        
softly.assertThat(messageResult.get().message().getLeft().getMessageId()).isEqualTo(messageId);
-        
softly.assertThat(IOUtils.toString(messageResult.get().message().getLeft().getContent(),
 Charsets.UTF_8))
+        CassandraMessageDAOV2.MessageResult messageResult = 
retrieveMessageOnV2().get();
+        
softly.assertThat(messageResult.message().getLeft().getMessageId()).isEqualTo(messageId);
+        
softly.assertThat(IOUtils.toString(messageResult.message().getLeft().getContent(),
 Charsets.UTF_8))
             .isEqualTo(CONTENT);
-        
softly.assertThat(messageResult.get().message().getRight().findAny().get()).isEqualTo(MessageAttachmentRepresentation.builder()
+        
softly.assertThat(messageResult.message().getRight().findAny().get()).isEqualTo(MessageAttachmentRepresentation.builder()
             .attachmentId(attachment.getAttachmentId())
             .cid(OptionalConverter.fromGuava(messageAttachment.getCid()))
             .isInline(messageAttachment.isInline())
@@ -180,6 +190,27 @@ public class V1ToV2MigrationTest {
             .build());
     }
 
+    private void awaitMigration() {
+        awaitability.atMost(1, TimeUnit.MINUTES)
+            .until(() -> {
+                try {
+                    retrieveMessageOnV2();
+                    return true;
+                } catch(AssertionError e) {
+                    return false;
+                }
+            });
+    }
+
+    private Optional<CassandraMessageDAOV2.MessageResult> 
retrieveMessageOnV2() {
+        Optional<CassandraMessageDAOV2.MessageResult> messageResult = 
messageDAOV2.retrieveMessages(metaDataList, MessageMapper.FetchType.Full, 
Limit.unlimited())
+            .join()
+            .findAny();
+
+        assertThat(messageResult.isPresent()).isTrue();
+        return messageResult;
+    }
+
     private SimpleMailboxMessage createMessage(MessageId messageId, String 
content, int bodyStart, PropertyBuilder propertyBuilder, 
List<MessageAttachment> attachments) {
         return new SimpleMailboxMessage(messageId, new Date(), 
content.length(), bodyStart, new 
SharedByteArrayInputStream(content.getBytes()), new Flags(), propertyBuilder, 
MAILBOX_ID, attachments);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to