Repository: james-project
Updated Branches:
  refs/heads/master c1379a10a -> fbb8b42b8


JAMES-1945 Improve parallelism of retrieving messages with attachments


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/fbb8b42b
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/fbb8b42b
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/fbb8b42b

Branch: refs/heads/master
Commit: fbb8b42b8e71c0c80fecbc74cf6011d3d247d786
Parents: c1379a1
Author: benwa <[email protected]>
Authored: Wed Feb 22 12:21:19 2017 +0700
Committer: benwa <[email protected]>
Committed: Wed Feb 22 13:03:48 2017 +0700

----------------------------------------------------------------------
 .../CassandraMailboxSessionMapperFactory.java   |  6 ++-
 .../cassandra/mail/AttachmentLoader.java        | 35 +++++++++------
 .../mail/CassandraAttachmentMapper.java         |  9 ++--
 .../mail/CassandraMessageIdMapper.java          | 25 ++++++-----
 .../cassandra/mail/CassandraMessageMapper.java  | 15 ++++---
 .../cassandra/mail/AttachmentLoaderTest.java    | 47 +++++++++++---------
 6 files changed, 80 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/fbb8b42b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 3172410..a1a7ff3 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -101,13 +101,15 @@ public class CassandraMailboxSessionMapperFactory extends 
MailboxSessionMapperFa
 
     @Override
     public CassandraMessageMapper createMessageMapper(MailboxSession 
mailboxSession) {
-        return new CassandraMessageMapper(uidProvider, modSeqProvider, null, 
maxRetry, createAttachmentMapper(mailboxSession),
+        return new CassandraMessageMapper(uidProvider, modSeqProvider, null, 
maxRetry,
+                (CassandraAttachmentMapper) 
createAttachmentMapper(mailboxSession),
                 messageDAO, messageIdDAO, imapUidDAO, mailboxCounterDAO, 
mailboxRecentsDAO, indexTableHandler, firstUnseenDAO);
     }
 
     @Override
     public MessageIdMapper createMessageIdMapper(MailboxSession 
mailboxSession) throws MailboxException {
-        return new CassandraMessageIdMapper(getMailboxMapper(mailboxSession), 
mailboxDAO, getAttachmentMapper(mailboxSession),
+        return new CassandraMessageIdMapper(getMailboxMapper(mailboxSession), 
mailboxDAO,
+                (CassandraAttachmentMapper) 
getAttachmentMapper(mailboxSession),
                 imapUidDAO, messageIdDAO, messageDAO, indexTableHandler, 
modSeqProvider, mailboxSession);
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/fbb8b42b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
index c07c033..f14982a 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoader.java
@@ -22,33 +22,36 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
 import org.apache.james.mailbox.model.Attachment;
 import org.apache.james.mailbox.model.AttachmentId;
 import org.apache.james.mailbox.model.MessageAttachment;
-import org.apache.james.mailbox.store.mail.AttachmentMapper;
 import org.apache.james.util.OptionalConverter;
 
 import com.github.steveash.guavate.Guavate;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 
 public class AttachmentLoader {
 
-    private final AttachmentMapper attachmentMapper;
+    private final CassandraAttachmentMapper attachmentMapper;
 
-    public AttachmentLoader(AttachmentMapper attachmentMapper) {
+    public AttachmentLoader(CassandraAttachmentMapper attachmentMapper) {
         this.attachmentMapper = attachmentMapper;
     }
 
-    public Collection<MessageAttachment> 
getAttachments(List<CassandraMessageDAO.MessageAttachmentRepresentation> 
attachmentRepresentations) {
-        Map<AttachmentId, Attachment> attachmentsById = 
attachmentsById(attachmentRepresentations.stream()
-            
.map(CassandraMessageDAO.MessageAttachmentRepresentation::getAttachmentId)
-            .collect(Guavate.toImmutableSet()));
+    public CompletableFuture<Collection<MessageAttachment>> 
getAttachments(List<CassandraMessageDAO.MessageAttachmentRepresentation> 
attachmentRepresentations) {
+        CompletableFuture<Map<AttachmentId, Attachment>> attachmentsByIdFuture 
=
+            attachmentsById(attachmentRepresentations.stream()
+                
.map(CassandraMessageDAO.MessageAttachmentRepresentation::getAttachmentId)
+                .collect(Guavate.toImmutableSet()));
 
-        return attachmentRepresentations.stream()
-            .map(representation -> 
constructMessageAttachment(attachmentsById.get(representation.getAttachmentId()),
 representation))
-            .collect(Guavate.toImmutableList());
+        return attachmentsByIdFuture.thenApply(attachmentsById ->
+            attachmentRepresentations.stream()
+                .map(representation -> 
constructMessageAttachment(attachmentsById.get(representation.getAttachmentId()),
 representation))
+                .collect(Guavate.toImmutableList()));
     }
 
     private MessageAttachment constructMessageAttachment(Attachment 
attachment, CassandraMessageDAO.MessageAttachmentRepresentation 
messageAttachmentRepresentation) {
@@ -60,9 +63,15 @@ public class AttachmentLoader {
                 .build();
     }
 
-    @VisibleForTesting Map<AttachmentId, Attachment> 
attachmentsById(Set<AttachmentId> attachmentIds) {
-        return attachmentMapper.getAttachments(attachmentIds).stream()
-            .collect(Guavate.toImmutableMap(Attachment::getAttachmentId, 
Function.identity()));
+    @VisibleForTesting
+    CompletableFuture<Map<AttachmentId, Attachment>> 
attachmentsById(Set<AttachmentId> attachmentIds) {
+        if (attachmentIds.isEmpty()) {
+            return CompletableFuture.completedFuture(ImmutableMap.of());
+        }
+        return attachmentMapper.getAttachmentsAsFuture(attachmentIds)
+            .thenApply(attachments -> attachments
+                .stream()
+                .collect(Guavate.toImmutableMap(Attachment::getAttachmentId, 
Function.identity())));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/fbb8b42b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
index c1f9af0..cc295ec 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraAttachmentMapper.java
@@ -93,9 +93,13 @@ public class CassandraAttachmentMapper implements 
AttachmentMapper {
 
     @Override
     public List<Attachment> getAttachments(Collection<AttachmentId> 
attachmentIds) {
+        return getAttachmentsAsFuture(attachmentIds).join();
+    }
+
+    public CompletableFuture<List<Attachment>> 
getAttachmentsAsFuture(Collection<AttachmentId> attachmentIds) {
         Preconditions.checkArgument(attachmentIds != null);
         if (attachmentIds.isEmpty()) {
-            return ImmutableList.of();
+            return CompletableFuture.completedFuture(ImmutableList.of());
         }
         List<String> ids = attachmentIds.stream()
                 .map(AttachmentId::getId)
@@ -104,8 +108,7 @@ public class CassandraAttachmentMapper implements 
AttachmentMapper {
             select(FIELDS)
                 .from(TABLE_NAME)
                 .where(in(ID, ids)))
-            .thenApply(this::attachments)
-            .join();
+            .thenApply(this::attachments);
     }
 
     private List<Attachment> attachments(ResultSet resultSet) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/fbb8b42b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 17768cb..bc950b7 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -43,7 +43,6 @@ import org.apache.james.mailbox.model.MessageAttachment;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.UpdatedFlags;
 import org.apache.james.mailbox.store.FlagsUpdateCalculator;
-import org.apache.james.mailbox.store.mail.AttachmentMapper;
 import org.apache.james.mailbox.store.mail.MailboxMapper;
 import org.apache.james.mailbox.store.mail.MessageIdMapper;
 import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
@@ -67,26 +66,26 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
 
     private final MailboxMapper mailboxMapper;
     private final CassandraMailboxDAO mailboxDAO;
-    private final AttachmentMapper attachmentMapper;
     private final CassandraMessageIdToImapUidDAO imapUidDAO;
     private final CassandraMessageIdDAO messageIdDAO;
     private final CassandraMessageDAO messageDAO;
     private final CassandraIndexTableHandler indexTableHandler;
     private final ModSeqProvider modSeqProvider;
     private final MailboxSession mailboxSession;
+    private final AttachmentLoader attachmentLoader;
 
-    public CassandraMessageIdMapper(MailboxMapper mailboxMapper, 
CassandraMailboxDAO mailboxDAO, AttachmentMapper attachmentMapper,
+    public CassandraMessageIdMapper(MailboxMapper mailboxMapper, 
CassandraMailboxDAO mailboxDAO, CassandraAttachmentMapper attachmentMapper,
                                     CassandraMessageIdToImapUidDAO imapUidDAO, 
CassandraMessageIdDAO messageIdDAO, CassandraMessageDAO messageDAO,
                                     CassandraIndexTableHandler 
indexTableHandler, ModSeqProvider modSeqProvider, MailboxSession 
mailboxSession) {
         this.mailboxMapper = mailboxMapper;
         this.mailboxDAO = mailboxDAO;
-        this.attachmentMapper = attachmentMapper;
         this.imapUidDAO = imapUidDAO;
         this.messageIdDAO = messageIdDAO;
         this.messageDAO = messageDAO;
         this.indexTableHandler = indexTableHandler;
         this.modSeqProvider = modSeqProvider;
         this.mailboxSession = mailboxSession;
+        this.attachmentLoader = new AttachmentLoader(attachmentMapper);
     }
 
     @Override
@@ -105,9 +104,10 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
             .thenCompose(stream -> CompletableFutureUtil.allOf(
                 stream.map(pair -> mailboxExists(pair.getLeft())
                     .thenApply(b -> Optional.of(pair).filter(any -> b)))))
+            .thenApply(stream -> stream.flatMap(OptionalConverter::toStream))
+            .thenApply(stream -> stream.map(loadAttachments(fetchType)))
+            .thenCompose(CompletableFutureUtil::allOf)
             .join()
-            .flatMap(OptionalConverter::toStream)
-            .map(loadAttachments(fetchType))
             .map(toMailboxMessages())
             .sorted(Comparator.comparing(MailboxMessage::getUid));
     }
@@ -126,14 +126,15 @@ public class CassandraMessageIdMapper implements 
MessageIdMapper {
             });
     }
 
-    private Function<Pair<CassandraMessageDAO.MessageWithoutAttachment, 
Stream<CassandraMessageDAO.MessageAttachmentRepresentation>>, 
Pair<CassandraMessageDAO.MessageWithoutAttachment, Stream<MessageAttachment>>> 
loadAttachments(FetchType fetchType) {
+    private Function<Pair<CassandraMessageDAO.MessageWithoutAttachment, 
Stream<CassandraMessageDAO.MessageAttachmentRepresentation>>,
+                     
CompletableFuture<Pair<CassandraMessageDAO.MessageWithoutAttachment, 
Stream<MessageAttachment>>>>
+                     loadAttachments(FetchType fetchType) {
         if (fetchType == FetchType.Full || fetchType == FetchType.Body) {
-            return pair -> Pair.of(pair.getLeft(),
-                new AttachmentLoader(attachmentMapper)
-                    
.getAttachments(pair.getRight().collect(Guavate.toImmutableList()))
-                    .stream());
+            return pair -> attachmentLoader
+                
.getAttachments(pair.getRight().collect(Guavate.toImmutableList()))
+                .thenApply(attachments -> Pair.of(pair.getLeft(), 
attachments.stream()));
         } else {
-            return pair -> Pair.of(pair.getLeft(), Stream.of());
+            return pair -> 
CompletableFuture.completedFuture(Pair.of(pair.getLeft(), Stream.of()));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/fbb8b42b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index c81ed79..c439e49 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -56,6 +56,7 @@ import org.apache.james.mailbox.store.mail.UidProvider;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.util.CompletableFutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +76,6 @@ public class CassandraMessageMapper implements MessageMapper {
     private final MailboxSession mailboxSession;
     private final UidProvider uidProvider;
     private final int maxRetries;
-    private final AttachmentMapper attachmentMapper;
     private final CassandraMessageDAO messageDAO;
     private final CassandraMessageIdDAO messageIdDAO;
     private final CassandraMessageIdToImapUidDAO imapUidDAO;
@@ -83,9 +83,10 @@ public class CassandraMessageMapper implements MessageMapper 
{
     private final CassandraMailboxRecentsDAO mailboxRecentDAO;
     private final CassandraIndexTableHandler indexTableHandler;
     private final CassandraFirstUnseenDAO firstUnseenDAO;
+    private final AttachmentLoader attachmentLoader;
 
     public CassandraMessageMapper(UidProvider uidProvider, ModSeqProvider 
modSeqProvider,
-                                  MailboxSession mailboxSession, int 
maxRetries, AttachmentMapper attachmentMapper,
+                                  MailboxSession mailboxSession, int 
maxRetries, CassandraAttachmentMapper attachmentMapper,
                                   CassandraMessageDAO messageDAO, 
CassandraMessageIdDAO messageIdDAO, CassandraMessageIdToImapUidDAO imapUidDAO,
                                   CassandraMailboxCounterDAO 
mailboxCounterDAO, CassandraMailboxRecentsDAO mailboxRecentDAO,
                                   CassandraIndexTableHandler 
indexTableHandler, CassandraFirstUnseenDAO firstUnseenDAO) {
@@ -93,7 +94,6 @@ public class CassandraMessageMapper implements MessageMapper {
         this.modSeqProvider = modSeqProvider;
         this.mailboxSession = mailboxSession;
         this.maxRetries = maxRetries;
-        this.attachmentMapper = attachmentMapper;
         this.messageDAO = messageDAO;
         this.messageIdDAO = messageIdDAO;
         this.imapUidDAO = imapUidDAO;
@@ -101,6 +101,7 @@ public class CassandraMessageMapper implements 
MessageMapper {
         this.mailboxRecentDAO = mailboxRecentDAO;
         this.indexTableHandler = indexTableHandler;
         this.firstUnseenDAO = firstUnseenDAO;
+        this.attachmentLoader = new AttachmentLoader(attachmentMapper);
     }
 
     @Override
@@ -168,10 +169,10 @@ public class CassandraMessageMapper implements 
MessageMapper {
         Stream<Pair<CassandraMessageDAO.MessageWithoutAttachment, 
Stream<CassandraMessageDAO.MessageAttachmentRepresentation>>>
             messageRepresentions = messageDAO.retrieveMessages(messageIds, 
fetchType, limit).join();
         if (fetchType == FetchType.Body || fetchType == FetchType.Full) {
-            return messageRepresentions
-                .map(pair -> Pair.of(pair.getLeft(), new 
AttachmentLoader(attachmentMapper)
-                    .getAttachments(pair.getRight()
-                        .collect(Guavate.toImmutableList()))))
+            return CompletableFutureUtil.allOf(messageRepresentions
+                .map(pair -> 
attachmentLoader.getAttachments(pair.getRight().collect(Guavate.toImmutableList()))
+                    .thenApply(attachments -> Pair.of(pair.getLeft(), 
attachments))))
+                .join()
                 .map(Throwing.function(pair -> pair.getLeft()
                     .toMailboxMessage(pair.getRight()
                         .stream()

http://git-wip-us.apache.org/repos/asf/james-project/blob/fbb8b42b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java
index 999a26f..0474727 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/AttachmentLoaderTest.java
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.james.mailbox.model.Attachment;
 import org.apache.james.mailbox.model.AttachmentId;
@@ -42,12 +43,12 @@ import com.google.common.collect.ImmutableSet;
 
 public class AttachmentLoaderTest {
 
-    private AttachmentMapper attachmentMapper;
+    private CassandraAttachmentMapper attachmentMapper;
     private AttachmentLoader testee;
 
     @Before
     public void setup() {
-        attachmentMapper = mock(AttachmentMapper.class);
+        attachmentMapper = mock(CassandraAttachmentMapper.class);
         testee = new AttachmentLoader(attachmentMapper);
     }
 
@@ -61,15 +62,16 @@ public class AttachmentLoaderTest {
             .bytes("attachment".getBytes())
             .type("type")
             .build();
-        when(attachmentMapper.getAttachments(attachmentIds))
-            .thenReturn(ImmutableList.of(attachment));
+        when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
+            
.thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment)));
 
         Optional<String> name = Optional.of("name1");
         Optional<Cid> cid = Optional.empty();
         boolean isInlined = false;
         CassandraMessageDAO.MessageAttachmentRepresentation 
attachmentRepresentation = new 
CassandraMessageDAO.MessageAttachmentRepresentation(attachmentId, name, cid, 
isInlined);
 
-        Collection<MessageAttachment> attachments = 
testee.getAttachments(ImmutableList.of(attachmentRepresentation, 
attachmentRepresentation));
+        Collection<MessageAttachment> attachments = 
testee.getAttachments(ImmutableList.of(attachmentRepresentation, 
attachmentRepresentation))
+            .join();
 
         MessageAttachment expectedAttachment = new 
MessageAttachment(attachment, OptionalConverter.toGuava(name), 
OptionalConverter.toGuava(cid), isInlined);
         assertThat(attachments).hasSize(2)
@@ -86,8 +88,8 @@ public class AttachmentLoaderTest {
             .bytes("attachment".getBytes())
             .type("type")
             .build();
-        when(attachmentMapper.getAttachments(attachmentIds))
-            .thenReturn(ImmutableList.of(attachment));
+        when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
+            
.thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment)));
 
         Optional<String> name1 = Optional.of("name1");
         Optional<String> name2 = Optional.of("name2");
@@ -96,7 +98,8 @@ public class AttachmentLoaderTest {
         CassandraMessageDAO.MessageAttachmentRepresentation 
attachmentRepresentation1 = new 
CassandraMessageDAO.MessageAttachmentRepresentation(attachmentId, name1, cid, 
isInlined);
         CassandraMessageDAO.MessageAttachmentRepresentation 
attachmentRepresentation2 = new 
CassandraMessageDAO.MessageAttachmentRepresentation(attachmentId, name2, cid, 
isInlined);
 
-        Collection<MessageAttachment> attachments = 
testee.getAttachments(ImmutableList.of(attachmentRepresentation1, 
attachmentRepresentation2));
+        Collection<MessageAttachment> attachments = 
testee.getAttachments(ImmutableList.of(attachmentRepresentation1, 
attachmentRepresentation2))
+            .join();
 
         assertThat(attachments).hasSize(2)
             .containsOnly(new MessageAttachment(attachment, 
OptionalConverter.toGuava(name1), OptionalConverter.toGuava(cid), isInlined),
@@ -119,8 +122,8 @@ public class AttachmentLoaderTest {
             .bytes("attachment2".getBytes())
             .type("type")
             .build();
-        when(attachmentMapper.getAttachments(attachmentIds))
-            .thenReturn(ImmutableList.of(attachment1, attachment2));
+        when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
+            
.thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment1, 
attachment2)));
 
         Optional<String> name1 = Optional.of("name1");
         Optional<String> name2 = Optional.of("name2");
@@ -129,7 +132,8 @@ public class AttachmentLoaderTest {
         CassandraMessageDAO.MessageAttachmentRepresentation 
attachmentRepresentation1 = new 
CassandraMessageDAO.MessageAttachmentRepresentation(attachmentId1, name1, cid, 
isInlined);
         CassandraMessageDAO.MessageAttachmentRepresentation 
attachmentRepresentation2 = new 
CassandraMessageDAO.MessageAttachmentRepresentation(attachmentId2, name2, cid, 
isInlined);
 
-        Collection<MessageAttachment> attachments = 
testee.getAttachments(ImmutableList.of(attachmentRepresentation1, 
attachmentRepresentation2));
+        Collection<MessageAttachment> attachments = 
testee.getAttachments(ImmutableList.of(attachmentRepresentation1, 
attachmentRepresentation2))
+            .join();
 
         assertThat(attachments).hasSize(2)
             .containsOnly(new MessageAttachment(attachment1, 
OptionalConverter.toGuava(name1), OptionalConverter.toGuava(cid), isInlined),
@@ -146,10 +150,11 @@ public class AttachmentLoaderTest {
             .bytes("attachment".getBytes())
             .type("type")
             .build();
-        when(attachmentMapper.getAttachments(attachmentIds))
-            .thenReturn(ImmutableList.of(attachment));
+        when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
+            
.thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment)));
 
-        Collection<MessageAttachment> attachments = 
testee.getAttachments(ImmutableList.of());
+        Collection<MessageAttachment> attachments = 
testee.getAttachments(ImmutableList.of())
+            .join();
 
         assertThat(attachments).isEmpty();
     }
@@ -170,10 +175,11 @@ public class AttachmentLoaderTest {
                 .bytes("attachment2".getBytes())
                 .type("type")
                 .build();
-        when(attachmentMapper.getAttachments(attachmentIds))
-            .thenReturn(ImmutableList.of(attachment, attachment2));
+        when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
+            
.thenReturn(CompletableFuture.completedFuture(ImmutableList.of(attachment, 
attachment2)));
 
-        Map<AttachmentId, Attachment> attachmentsById = 
testee.attachmentsById(attachmentIds);
+        Map<AttachmentId, Attachment> attachmentsById = 
testee.attachmentsById(attachmentIds)
+            .join();
 
         assertThat(attachmentsById).hasSize(2)
                 .containsOnly(MapEntry.entry(attachmentId, attachment), 
MapEntry.entry(attachmentId2, attachment2));
@@ -185,10 +191,11 @@ public class AttachmentLoaderTest {
         AttachmentId attachmentId2 = AttachmentId.from("2");
         Set<AttachmentId> attachmentIds = ImmutableSet.of(attachmentId, 
attachmentId2);
 
-        when(attachmentMapper.getAttachments(attachmentIds))
-                .thenReturn(ImmutableList.of());
+        when(attachmentMapper.getAttachmentsAsFuture(attachmentIds))
+                
.thenReturn(CompletableFuture.completedFuture(ImmutableList.of()));
 
-        Map<AttachmentId, Attachment> attachmentsById = 
testee.attachmentsById(attachmentIds);
+        Map<AttachmentId, Attachment> attachmentsById = 
testee.attachmentsById(attachmentIds)
+            .join();
 
         assertThat(attachmentsById).hasSize(0);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to