Repository: james-project Updated Branches: refs/heads/master 09c9d34c6 -> 9505324c1
JAMES-2038 Move collect out of future A bug exists: Datastax driver hangs when collecting in a future. Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/12ac91f4 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/12ac91f4 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/12ac91f4 Branch: refs/heads/master Commit: 12ac91f4ba80adc809c8b8e68fa913fa058df931 Parents: 09c9d34 Author: benwa <[email protected]> Authored: Mon May 29 07:59:29 2017 +0700 Committer: benwa <[email protected]> Committed: Tue May 30 15:31:34 2017 +0700 ---------------------------------------------------------------------- .../mail/CassandraMailboxRecentsDAO.java | 6 ++-- .../cassandra/mail/CassandraMessageMapper.java | 3 +- .../mail/CassandraIndexTableHandlerTest.java | 18 ++++++---- .../mail/CassandraMailboxRecentDAOTest.java | 38 ++++++++++++++++---- 4 files changed, 48 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/12ac91f4/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java index 06df836..48a6a9c 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentsDAO.java @@ -27,6 +27,7 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.select; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.stream.Stream; import javax.inject.Inject; @@ -78,12 +79,11 @@ public class CassandraMailboxRecentsDAO { .value(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID, bindMarker(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID))); } - public CompletableFuture<List<MessageUid>> getRecentMessageUidsInMailbox(CassandraId mailboxId) { + public CompletableFuture<Stream<MessageUid>> getRecentMessageUidsInMailbox(CassandraId mailboxId) { return cassandraAsyncExecutor.execute(bindWithMailbox(mailboxId, readStatement)) .thenApply(CassandraUtils::convertToStream) .thenApply(stream -> stream.map(row -> row.getLong(CassandraMailboxRecentsTable.RECENT_MESSAGE_UID))) - .thenApply(stream -> stream.map(MessageUid::of)) - .thenApply(stream -> stream.collect(Guavate.toImmutableList())); + .thenApply(stream -> stream.map(MessageUid::of)); } private BoundStatement bindWithMailbox(CassandraId mailboxId, PreparedStatement statement) { http://git-wip-us.apache.org/repos/asf/james-project/blob/12ac91f4/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java index 8d00e77..b4ac929 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java @@ -202,7 +202,8 @@ public class CassandraMessageMapper implements MessageMapper { public List<MessageUid> findRecentMessageUidsInMailbox(Mailbox mailbox) throws MailboxException { CassandraId mailboxId = (CassandraId) mailbox.getMailboxId(); return mailboxRecentDAO.getRecentMessageUidsInMailbox(mailboxId) - .join(); + .join() + .collect(Guavate.toImmutableList()); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/12ac91f4/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java index b82fb4a..e1b9f34 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraIndexTableHandlerTest.java @@ -149,7 +149,8 @@ public class CassandraIndexTableHandlerTest { testee.updateIndexOnAdd(message, MAILBOX_ID).join(); - assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join()) + assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join() + .collect(Guavate.toImmutableList())) .isEmpty(); } @@ -161,7 +162,8 @@ public class CassandraIndexTableHandlerTest { testee.updateIndexOnAdd(message, MAILBOX_ID).join(); - assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join()) + assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join() + .collect(Guavate.toImmutableList())) .containsOnly(MESSAGE_UID); } @@ -232,7 +234,8 @@ public class CassandraIndexTableHandlerTest { MODSEQ), MAILBOX_ID).join(); - assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join()) + assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join() + .collect(Guavate.toImmutableList())) .isEmpty(); } @@ -250,7 +253,8 @@ public class CassandraIndexTableHandlerTest { MODSEQ), MAILBOX_ID).join(); - assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join()) + assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join() + .collect(Guavate.toImmutableList())) .isEmpty(); } @@ -476,7 +480,8 @@ public class CassandraIndexTableHandlerTest { .modSeq(MODSEQ) .build()).join(); - assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join()) + assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join() + .collect(Guavate.toImmutableList())) .containsOnly(MESSAGE_UID); } @@ -495,7 +500,8 @@ public class CassandraIndexTableHandlerTest { .modSeq(MODSEQ) .build()).join(); - assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join()) + assertThat(mailboxRecentsDAO.getRecentMessageUidsInMailbox(MAILBOX_ID).join() + .collect(Guavate.toImmutableList())) .isEmpty(); } http://git-wip-us.apache.org/repos/asf/james-project/blob/12ac91f4/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentDAOTest.java ---------------------------------------------------------------------- diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentDAOTest.java index 81bb313..0517b4f 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentDAOTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxRecentDAOTest.java @@ -21,14 +21,19 @@ package org.apache.james.mailbox.cassandra.mail; import static org.assertj.core.api.Assertions.assertThat; +import java.util.stream.IntStream; + import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.mailbox.MessageUid; import org.apache.james.mailbox.cassandra.CassandraId; import org.apache.james.mailbox.cassandra.modules.CassandraMailboxRecentsModule; +import org.apache.james.util.FluentFutureStream; import org.junit.After; import org.junit.Before; import org.junit.Test; +import com.github.steveash.guavate.Guavate; + public class CassandraMailboxRecentDAOTest { public static final MessageUid UID1 = MessageUid.of(36L); public static final MessageUid UID2 = MessageUid.of(37L); @@ -52,14 +57,16 @@ public class CassandraMailboxRecentDAOTest { @Test public void getRecentMessageUidsInMailboxShouldBeEmptyByDefault() throws Exception { - assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()).isEmpty(); + assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join() + .collect(Guavate.toImmutableList())).isEmpty(); } @Test public void addToRecentShouldAddUidWhenEmpty() throws Exception { testee.addToRecent(CASSANDRA_ID, UID1).join(); - assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()).containsOnly(UID1); + assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join() + .collect(Guavate.toImmutableList())).containsOnly(UID1); } @Test @@ -68,14 +75,16 @@ public class CassandraMailboxRecentDAOTest { testee.removeFromRecent(CASSANDRA_ID, UID1).join(); - assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()).isEmpty(); + assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join() + .collect(Guavate.toImmutableList())).isEmpty(); } @Test public void removeFromRecentShouldNotFailIfNotExisting() throws Exception { testee.removeFromRecent(CASSANDRA_ID, UID1).join(); - assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()).isEmpty(); + assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join() + .collect(Guavate.toImmutableList())).isEmpty(); } @Test @@ -84,7 +93,8 @@ public class CassandraMailboxRecentDAOTest { testee.addToRecent(CASSANDRA_ID, UID2).join(); - assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()).containsOnly(UID1, UID2); + assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join() + .collect(Guavate.toImmutableList())).containsOnly(UID1, UID2); } @Test @@ -94,7 +104,8 @@ public class CassandraMailboxRecentDAOTest { testee.removeFromRecent(CASSANDRA_ID, UID2).join(); - assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()).containsOnly(UID1); + assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join() + .collect(Guavate.toImmutableList())).containsOnly(UID1); } @Test @@ -102,6 +113,19 @@ public class CassandraMailboxRecentDAOTest { testee.addToRecent(CASSANDRA_ID, UID1).join(); testee.addToRecent(CASSANDRA_ID, UID1).join(); - assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join()).containsOnly(UID1); + assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join() + .collect(Guavate.toImmutableList())).containsOnly(UID1); + } + + @Test + public void getRecentMessageUidsInMailboxShouldNotTimeoutWhenOverPagingLimit() throws Exception { + int pageSize = 5000; + int size = pageSize + 1000; + FluentFutureStream.of(IntStream.range(0, size) + .mapToObj(i -> testee.addToRecent(CASSANDRA_ID, MessageUid.of(i + 1)))) + .join(); + + assertThat(testee.getRecentMessageUidsInMailbox(CASSANDRA_ID).join() + .collect(Guavate.toImmutableList())).hasSize(size); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
