JAMES-2544 Initialize RabbitMQ mail queues when creating
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b70ca1a7 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b70ca1a7 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b70ca1a7 Branch: refs/heads/master Commit: b70ca1a7c8cc8e9af66f24b3e15ef73a8022d09f Parents: 6be6b16 Author: Antoine Duprat <adup...@linagora.com> Authored: Fri Sep 14 15:26:44 2018 +0200 Committer: Benoit Tellier <btell...@linagora.com> Committed: Wed Sep 26 09:22:06 2018 +0700 ---------------------------------------------------------------------- .../james/queue/rabbitmq/RabbitMQMailQueue.java | 2 ++ .../queue/rabbitmq/view/api/MailQueueView.java | 3 +++ .../cassandra/CassandraMailQueueMailStore.java | 19 +++---------------- .../view/cassandra/CassandraMailQueueView.java | 7 ++++++- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 9 +++++++++ .../CassandraMailQueueViewTestFactory.java | 9 +++++++++ 6 files changed, 32 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/b70ca1a7/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java index c653943..50853aa 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java @@ -69,6 +69,8 @@ public class RabbitMQMailQueue implements ManageableMailQueue { } RabbitMQMailQueue create(MailQueueName mailQueueName) { + mailQueueView.initialize(mailQueueName); + return new RabbitMQMailQueue(metricFactory, mailQueueName, new Enqueuer(mailQueueName, rabbitClient, mimeMessageStore, mailReferenceSerializer, metricFactory, mailQueueView, clock), http://git-wip-us.apache.org/repos/asf/james-project/blob/b70ca1a7/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java index 10ad42d..0455ff8 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java @@ -23,10 +23,13 @@ import java.time.Instant; import java.util.concurrent.CompletableFuture; import org.apache.james.queue.api.ManageableMailQueue; +import org.apache.james.queue.rabbitmq.MailQueueName; import org.apache.mailet.Mail; public interface MailQueueView { + void initialize(MailQueueName mailQueueName); + CompletableFuture<Void> storeMail(Instant enqueuedTime, Mail mail); CompletableFuture<Void> deleteMail(Mail mail); http://git-wip-us.apache.org/repos/asf/james-project/blob/b70ca1a7/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java index a243407..9339d6e 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java @@ -21,9 +21,7 @@ package org.apache.james.queue.rabbitmq.view.cassandra; import java.time.Clock; import java.time.Instant; -import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import javax.inject.Inject; @@ -39,7 +37,6 @@ class CassandraMailQueueMailStore { private final BrowseStartDAO browseStartDao; private final CassandraMailQueueViewConfiguration configuration; private final Clock clock; - private final Set<MailQueueName> initialInserted; @Inject CassandraMailQueueMailStore(EnqueuedMailsDAO enqueuedMailsDao, @@ -50,27 +47,17 @@ class CassandraMailQueueMailStore { this.browseStartDao = browseStartDao; this.configuration = configuration; this.clock = clock; - this.initialInserted = ConcurrentHashMap.newKeySet(); } CompletableFuture<Void> storeMailInEnqueueTable(Mail mail, MailQueueName mailQueueName, Instant enqueuedTime) { EnqueuedMail enqueuedMail = convertToEnqueuedMail(mail, mailQueueName, enqueuedTime); - return enqueuedMailsDao.insert(enqueuedMail) - .thenCompose(any -> initBrowseStartIfNeeded(mailQueueName, enqueuedMail.getTimeRangeStart())); + return enqueuedMailsDao.insert(enqueuedMail); } - private CompletableFuture<Void> initBrowseStartIfNeeded(MailQueueName mailQueueName, Instant sliceStartAt) { - if (!initialInserted.contains(mailQueueName)) { - return tryInsertBrowseStart(mailQueueName, sliceStartAt); - } - return CompletableFuture.completedFuture(null); - } - - private CompletableFuture<Void> tryInsertBrowseStart(MailQueueName mailQueueName, Instant sliceStartAt) { + CompletableFuture<Void> initializeBrowseStart(MailQueueName mailQueueName) { return browseStartDao - .insertInitialBrowseStart(mailQueueName, sliceStartAt) - .thenAccept(any -> initialInserted.add(mailQueueName)); + .insertInitialBrowseStart(mailQueueName, currentSliceStartInstant()); } private EnqueuedMail convertToEnqueuedMail(Mail mail, MailQueueName mailQueueName, Instant enqueuedTime) { http://git-wip-us.apache.org/repos/asf/james-project/blob/b70ca1a7/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java index ad37b5b..1876f88 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java @@ -67,6 +67,12 @@ public class CassandraMailQueueView implements MailQueueView { } @Override + public void initialize(MailQueueName mailQueueName) { + storeHelper.initializeBrowseStart(mailQueueName) + .join(); + } + + @Override public CompletableFuture<Void> storeMail(Instant enqueuedTime, Mail mail) { return storeHelper.storeMailInEnqueueTable(mail, mailQueueName, enqueuedTime); } @@ -88,5 +94,4 @@ public class CassandraMailQueueView implements MailQueueView { public long getSize() { return Iterators.size(browse()); } - } http://git-wip-us.apache.org/repos/asf/james-project/blob/b70ca1a7/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index f1f31be..8e495b4 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -213,6 +213,15 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQ .containsExactly("3-4", "3-5", "5-1", "5-2", "5-3", "5-4", "5-5"); } + @Test + void mailQueueShouldBeInitializedWhenCreating(CassandraCluster cassandra) { + String name = "myQueue"; + mailQueueFactory.createQueue(name); + + boolean initialized = CassandraMailQueueViewTestFactory.isInitialized(cassandra.getConf(), MailQueueName.fromString(name)); + assertThat(initialized).isTrue(); + } + @Disabled @Override public void clearShouldNotFailWhenBrowsingIterating() { http://git-wip-us.apache.org/repos/asf/james-project/blob/b70ca1a7/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java index 95733c2..b37fb91 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java @@ -20,10 +20,12 @@ package org.apache.james.queue.rabbitmq.view.cassandra; import java.time.Clock; +import java.util.Optional; import java.util.concurrent.ThreadLocalRandom; import org.apache.james.backends.cassandra.init.CassandraTypesProvider; import org.apache.james.backends.cassandra.utils.CassandraUtils; +import org.apache.james.queue.rabbitmq.MailQueueName; import com.datastax.driver.core.Session; @@ -45,4 +47,11 @@ public class CassandraMailQueueViewTestFactory { cassandraMailQueueBrowser, cassandraMailQueueMailDelete); } + + public static boolean isInitialized(Session session, MailQueueName mailQueueName) { + BrowseStartDAO browseStartDao = new BrowseStartDAO(session); + return browseStartDao.findBrowseStart(mailQueueName) + .thenApply(Optional::isPresent) + .join(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org