Repository: james-project
Updated Branches:
  refs/heads/master 09c9d34c6 -> 9505324c1


JAMES-2038 Move collect out of future

A bug exists: Datastax driver hangs when collecting in a future.


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/12ac91f4
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/12ac91f4
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/12ac91f4

Branch: refs/heads/master
Commit: 12ac91f4ba80adc809c8b8e68fa913fa058df931
Parents: 09c9d34
Author: benwa <[email protected]>
Authored: Mon May 29 07:59:29 2017 +0700
Committer: benwa <[email protected]>
Committed: Tue May 30 15:31:34 2017 +0700

----------------------------------------------------------------------
 .../mail/CassandraMailboxRecentsDAO.java        |  6 ++--
 .../cassandra/mail/CassandraMessageMapper.java  |  3 +-
 .../mail/CassandraIndexTableHandlerTest.java    | 18 ++++++----
 .../mail/CassandraMailboxRecentDAOTest.java     | 38 ++++++++++++++++----
 4 files changed, 48 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/12ac91f4/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
index 06df836..48a6a9c 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java
@@ -27,6 +27,7 @@ import static 
com.datastax.driver.core.querybuilder.QueryBuilder.select;
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
 
 import javax.inject.Inject;
 
@@ -78,12 +79,11 @@ public class CassandraMailboxRecentsDAO {
                 .value(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID, 
bindMarker(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID)));
     }
 
-    public CompletableFuture<List<MessageUid>> 
getRecentMessageUidsInMailbox(CassandraId mailboxId) {
+    public CompletableFuture<Stream<MessageUid>> 
getRecentMessageUidsInMailbox(CassandraId mailboxId) {
         return cassandraAsyncExecutor.execute(bindWithMailbox(mailboxId, 
readStatement))
             .thenApply(CassandraUtils::convertToStream)
             .thenApply(stream -> stream.map(row -> 
row.getLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID)))
-            .thenApply(stream -> stream.map(MessageUid::of))
-            .thenApply(stream -> stream.collect(Guavate.toImmutableList()));
+            .thenApply(stream -> stream.map(MessageUid::of));
     }
 
     private BoundStatement bindWithMailbox(CassandraId mailboxId, 
PreparedStatement statement) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/12ac91f4/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 8d00e77..b4ac929 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -202,7 +202,8 @@ public class CassandraMessageMapper implements 
MessageMapper {
     public List<MessageUid> findRecentMessageUidsInMailbox(Mailbox mailbox) 
throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         return mailboxRecentDAO.getRecentMessageUidsInMailbox(mailboxId)
-                .join();
+                .join()
+                .collect(Guavate.toImmutableList());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/james-project/blob/12ac91f4/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java
index b82fb4a..e1b9f34 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java
@@ -149,7 +149,8 @@ public class CassandraIndexTableHandlerTest {
 
         testee.updateIndexOnAdd(message, MAILBOX_ID).join();
 
-        
assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join())
+        
assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join()
+            .collect(Guavate.toImmutableList()))
             .isEmpty();
     }
 
@@ -161,7 +162,8 @@ public class CassandraIndexTableHandlerTest {
 
         testee.updateIndexOnAdd(message, MAILBOX_ID).join();
 
-        
assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join())
+        
assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join()
+            .collect(Guavate.toImmutableList()))
             .containsOnly(MESSAGE_UID);
     }
 
@@ -232,7 +234,8 @@ public class CassandraIndexTableHandlerTest {
                 MODSEQ),
             MAILBOX_ID).join();
 
-        
assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join())
+        
assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join()
+            .collect(Guavate.toImmutableList()))
             .isEmpty();
     }
 
@@ -250,7 +253,8 @@ public class CassandraIndexTableHandlerTest {
                 MODSEQ),
             MAILBOX_ID).join();
 
-        
assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join())
+        
assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join()
+            .collect(Guavate.toImmutableList()))
             .isEmpty();
     }
 
@@ -476,7 +480,8 @@ public class CassandraIndexTableHandlerTest {
             .modSeq(MODSEQ)
             .build()).join();
 
-        
assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join())
+        
assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join()
+            .collect(Guavate.toImmutableList()))
             .containsOnly(MESSAGE_UID);
     }
 
@@ -495,7 +500,8 @@ public class CassandraIndexTableHandlerTest {
             .modSeq(MODSEQ)
             .build()).join();
 
-        
assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join())
+        
assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join()
+            .collect(Guavate.toImmutableList()))
             .isEmpty();
     }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/12ac91f4/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentDAOTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentDAOTest.java
 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentDAOTest.java
index 81bb313..0517b4f 100644
--- 
a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentDAOTest.java
+++ 
b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentDAOTest.java
@@ -21,14 +21,19 @@ package org.apache.james.mailbox.cassandra.mail;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.stream.IntStream;
+
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.cassandra.CassandraId;
 import 
org.apache.james.mailbox.cassandra.modules.CassandraMailboxRecentsModule;
+import org.apache.james.util.FluentFutureStream;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.github.steveash.guavate.Guavate;
+
 public class CassandraMailboxRecentDAOTest {
     public static final MessageUid UID1 = MessageUid.of(36L);
     public static final MessageUid UID2 = MessageUid.of(37L);
@@ -52,14 +57,16 @@ public class CassandraMailboxRecentDAOTest {
 
     @Test
     public void getRecentMessageUidsInMailboxShouldBeEmptyByDefault() throws 
Exception {
-        
assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()).isEmpty();
+        assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()
+            .collect(Guavate.toImmutableList())).isEmpty();
     }
 
     @Test
     public void addToRecentShouldAddUidWhenEmpty() throws Exception {
         testee.addToRecent(CASSANDRA_ID, UID1).join();
 
-        
assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()).containsOnly(UID1);
+        assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()
+            .collect(Guavate.toImmutableList())).containsOnly(UID1);
     }
 
     @Test
@@ -68,14 +75,16 @@ public class CassandraMailboxRecentDAOTest {
 
         testee.removeFromRecent(CASSANDRA_ID, UID1).join();
 
-        
assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()).isEmpty();
+        assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()
+            .collect(Guavate.toImmutableList())).isEmpty();
     }
 
     @Test
     public void removeFromRecentShouldNotFailIfNotExisting() throws Exception {
         testee.removeFromRecent(CASSANDRA_ID, UID1).join();
 
-        
assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()).isEmpty();
+        assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()
+            .collect(Guavate.toImmutableList())).isEmpty();
     }
 
     @Test
@@ -84,7 +93,8 @@ public class CassandraMailboxRecentDAOTest {
 
         testee.addToRecent(CASSANDRA_ID, UID2).join();
 
-        
assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()).containsOnly(UID1,
 UID2);
+        assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()
+            .collect(Guavate.toImmutableList())).containsOnly(UID1, UID2);
     }
 
     @Test
@@ -94,7 +104,8 @@ public class CassandraMailboxRecentDAOTest {
 
         testee.removeFromRecent(CASSANDRA_ID, UID2).join();
 
-        
assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()).containsOnly(UID1);
+        assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()
+            .collect(Guavate.toImmutableList())).containsOnly(UID1);
     }
 
     @Test
@@ -102,6 +113,19 @@ public class CassandraMailboxRecentDAOTest {
         testee.addToRecent(CASSANDRA_ID, UID1).join();
         testee.addToRecent(CASSANDRA_ID, UID1).join();
 
-        
assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()).containsOnly(UID1);
+        assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()
+            .collect(Guavate.toImmutableList())).containsOnly(UID1);
+    }
+
+    @Test
+    public void 
getRecentMessageUidsInMailboxShouldNotTimeoutWhenOverPagingLimit() throws 
Exception {
+        int pageSize = 5000;
+        int size = pageSize + 1000;
+        FluentFutureStream.of(IntStream.range(0, size)
+            .mapToObj(i -> testee.addToRecent(CASSANDRA_ID, MessageUid.of(i + 
1))))
+            .join();
+
+        assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()
+            .collect(Guavate.toImmutableList())).hasSize(size);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to