JAMES-2544 Initialize RabbitMQ mail queues when creating

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/b70ca1a7
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/b70ca1a7
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/b70ca1a7

Branch: refs/heads/master
Commit: b70ca1a7c8cc8e9af66f24b3e15ef73a8022d09f
Parents: 6be6b16
Author: Antoine Duprat <adup...@linagora.com>
Authored: Fri Sep 14 15:26:44 2018 +0200
Committer: Benoit Tellier <btell...@linagora.com>
Committed: Wed Sep 26 09:22:06 2018 +0700

----------------------------------------------------------------------
 .../james/queue/rabbitmq/RabbitMQMailQueue.java  |  2 ++
 .../queue/rabbitmq/view/api/MailQueueView.java   |  3 +++
 .../cassandra/CassandraMailQueueMailStore.java   | 19 +++----------------
 .../view/cassandra/CassandraMailQueueView.java   |  7 ++++++-
 .../queue/rabbitmq/RabbitMQMailQueueTest.java    |  9 +++++++++
 .../CassandraMailQueueViewTestFactory.java       |  9 +++++++++
 6 files changed, 32 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/b70ca1a7/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index c653943..50853aa 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -69,6 +69,8 @@ public class RabbitMQMailQueue implements ManageableMailQueue 
{
         }
 
         RabbitMQMailQueue create(MailQueueName mailQueueName) {
+            mailQueueView.initialize(mailQueueName);
+
             return new RabbitMQMailQueue(metricFactory, mailQueueName,
                 new Enqueuer(mailQueueName, rabbitClient, mimeMessageStore, 
mailReferenceSerializer,
                     metricFactory, mailQueueView, clock),

http://git-wip-us.apache.org/repos/asf/james-project/blob/b70ca1a7/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
index 10ad42d..0455ff8 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
@@ -23,10 +23,13 @@ import java.time.Instant;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.james.queue.rabbitmq.MailQueueName;
 import org.apache.mailet.Mail;
 
 public interface MailQueueView {
 
+    void initialize(MailQueueName mailQueueName);
+
     CompletableFuture<Void> storeMail(Instant enqueuedTime, Mail mail);
 
     CompletableFuture<Void> deleteMail(Mail mail);

http://git-wip-us.apache.org/repos/asf/james-project/blob/b70ca1a7/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
index a243407..9339d6e 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java
@@ -21,9 +21,7 @@ package org.apache.james.queue.rabbitmq.view.cassandra;
 
 import java.time.Clock;
 import java.time.Instant;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 
 import javax.inject.Inject;
 
@@ -39,7 +37,6 @@ class CassandraMailQueueMailStore {
     private final BrowseStartDAO browseStartDao;
     private final CassandraMailQueueViewConfiguration configuration;
     private final Clock clock;
-    private final Set<MailQueueName> initialInserted;
 
     @Inject
     CassandraMailQueueMailStore(EnqueuedMailsDAO enqueuedMailsDao,
@@ -50,27 +47,17 @@ class CassandraMailQueueMailStore {
         this.browseStartDao = browseStartDao;
         this.configuration = configuration;
         this.clock = clock;
-        this.initialInserted = ConcurrentHashMap.newKeySet();
     }
 
     CompletableFuture<Void> storeMailInEnqueueTable(Mail mail, MailQueueName 
mailQueueName, Instant enqueuedTime) {
         EnqueuedMail enqueuedMail = convertToEnqueuedMail(mail, mailQueueName, 
enqueuedTime);
 
-        return enqueuedMailsDao.insert(enqueuedMail)
-            .thenCompose(any -> initBrowseStartIfNeeded(mailQueueName, 
enqueuedMail.getTimeRangeStart()));
+        return enqueuedMailsDao.insert(enqueuedMail);
     }
 
-    private CompletableFuture<Void> initBrowseStartIfNeeded(MailQueueName 
mailQueueName, Instant sliceStartAt) {
-        if (!initialInserted.contains(mailQueueName)) {
-            return tryInsertBrowseStart(mailQueueName, sliceStartAt);
-        }
-        return CompletableFuture.completedFuture(null);
-    }
-
-    private CompletableFuture<Void> tryInsertBrowseStart(MailQueueName 
mailQueueName, Instant sliceStartAt) {
+    CompletableFuture<Void> initializeBrowseStart(MailQueueName mailQueueName) 
{
         return browseStartDao
-            .insertInitialBrowseStart(mailQueueName, sliceStartAt)
-            .thenAccept(any -> initialInserted.add(mailQueueName));
+            .insertInitialBrowseStart(mailQueueName, 
currentSliceStartInstant());
     }
 
     private EnqueuedMail convertToEnqueuedMail(Mail mail, MailQueueName 
mailQueueName, Instant enqueuedTime) {

http://git-wip-us.apache.org/repos/asf/james-project/blob/b70ca1a7/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
index ad37b5b..1876f88 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
@@ -67,6 +67,12 @@ public class CassandraMailQueueView implements MailQueueView 
{
     }
 
     @Override
+    public void initialize(MailQueueName mailQueueName) {
+        storeHelper.initializeBrowseStart(mailQueueName)
+            .join();
+    }
+
+    @Override
     public CompletableFuture<Void> storeMail(Instant enqueuedTime, Mail mail) {
         return storeHelper.storeMailInEnqueueTable(mail, mailQueueName, 
enqueuedTime);
     }
@@ -88,5 +94,4 @@ public class CassandraMailQueueView implements MailQueueView {
     public long getSize() {
         return Iterators.size(browse());
     }
-
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/b70ca1a7/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index f1f31be..8e495b4 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -213,6 +213,15 @@ public class RabbitMQMailQueueTest implements 
ManageableMailQueueContract, MailQ
             .containsExactly("3-4", "3-5", "5-1", "5-2", "5-3", "5-4", "5-5");
     }
 
+    @Test
+    void mailQueueShouldBeInitializedWhenCreating(CassandraCluster cassandra) {
+        String name = "myQueue";
+        mailQueueFactory.createQueue(name);
+
+        boolean initialized = 
CassandraMailQueueViewTestFactory.isInitialized(cassandra.getConf(), 
MailQueueName.fromString(name));
+        assertThat(initialized).isTrue();
+    }
+
     @Disabled
     @Override
     public void clearShouldNotFailWhenBrowsingIterating() {

http://git-wip-us.apache.org/repos/asf/james-project/blob/b70ca1a7/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
index 95733c2..b37fb91 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueViewTestFactory.java
@@ -20,10 +20,12 @@
 package org.apache.james.queue.rabbitmq.view.cassandra;
 
 import java.time.Clock;
+import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.james.backends.cassandra.init.CassandraTypesProvider;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
+import org.apache.james.queue.rabbitmq.MailQueueName;
 
 import com.datastax.driver.core.Session;
 
@@ -45,4 +47,11 @@ public class CassandraMailQueueViewTestFactory {
             cassandraMailQueueBrowser,
             cassandraMailQueueMailDelete);
     }
+
+    public static boolean isInitialized(Session session, MailQueueName 
mailQueueName) {
+        BrowseStartDAO browseStartDao = new BrowseStartDAO(session);
+        return browseStartDao.findBrowseStart(mailQueueName)
+            .thenApply(Optional::isPresent)
+            .join();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to