This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 17f8fc5696b56c159a23d6d4abbde0d996756baf
Author: Rémi Kowalski <rkowal...@linagora.com>
AuthorDate: Mon Jul 6 15:16:48 2020 +0200

    JAMES-3296 Add republishing to RabbitMQMailQueue from Cassandra capability
---
 .../james/webadmin/dto/MailQueueItemDTOTest.java   |   3 +-
 .../james/queue/api/ManageableMailQueue.java       |  18 +-
 .../james/queue/file/FileCacheableMailQueue.java   |   2 +-
 .../james/queue/jms/JMSCacheableMailQueue.java     |   2 +-
 .../james/queue/memory/MemoryMailQueueFactory.java |   4 +-
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  |   6 +-
 .../org/apache/james/queue/rabbitmq/Enqueuer.java  |   8 +
 .../james/queue/rabbitmq/RabbitMQMailQueue.java    |  17 +-
 .../queue/rabbitmq/view/api/MailQueueView.java     |   9 +-
 .../view/cassandra/CassandraMailQueueBrowser.java  |  64 ++++++-
 .../view/cassandra/CassandraMailQueueView.java     |  22 ++-
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 193 ++++++++++++++++++++-
 .../rabbitmq/RabbitMqMailQueueFactoryTest.java     |   3 +-
 13 files changed, 322 insertions(+), 29 deletions(-)

diff --git 
a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/dto/MailQueueItemDTOTest.java
 
b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/dto/MailQueueItemDTOTest.java
index 18b14d2..4b177de 100644
--- 
a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/dto/MailQueueItemDTOTest.java
+++ 
b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/dto/MailQueueItemDTOTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.james.core.MailAddress;
 import org.apache.james.queue.api.Mails;
+import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.queue.api.ManageableMailQueue.MailQueueItemView;
 import org.apache.mailet.base.test.FakeMail;
 import org.assertj.core.api.JUnitSoftAssertions;
@@ -54,7 +55,7 @@ public class MailQueueItemDTOTest {
     public void fromShouldCreateTheRightObject() throws Exception {
         FakeMail mail = Mails.defaultMail().name("name").build();
         ZonedDateTime date = ZonedDateTime.parse("2018-01-02T11:22:02Z");
-        MailQueueItemView mailQueueItemView = new MailQueueItemView(mail, 
date);
+        MailQueueItemView mailQueueItemView = new 
ManageableMailQueue.DefaultMailQueueItemView(mail, date);
         MailQueueItemDTO mailQueueItemDTO = 
MailQueueItemDTO.from(mailQueueItemView);
         List<String> expectedRecipients = mail.getRecipients().stream()
                 .map(MailAddress::asString)
diff --git 
a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java
 
b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java
index 20635a6..5309dbc 100644
--- 
a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java
+++ 
b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java
@@ -91,20 +91,30 @@ public interface ManageableMailQueue extends MailQueue {
     /**
      * Represent a View over a queue {@link MailQueue.MailQueueItem}
      */
-    class MailQueueItemView {
+    interface MailQueueItemView {
+        Mail getMail();
+
+        Optional<ZonedDateTime> getNextDelivery();
+    }
+
+
+    /**
+     * Represent a View over a queue {@link MailQueue.MailQueueItem}
+     */
+    class DefaultMailQueueItemView implements MailQueueItemView {
 
         private final Mail mail;
         private final Optional<ZonedDateTime> nextDelivery;
 
-        public MailQueueItemView(Mail mail) {
+        public DefaultMailQueueItemView(Mail mail) {
             this(mail, Optional.empty());
         }
 
-        public MailQueueItemView(Mail mail, ZonedDateTime nextDelivery) {
+        public DefaultMailQueueItemView(Mail mail, ZonedDateTime nextDelivery) 
{
             this(mail, Optional.of(nextDelivery));
         }
 
-        public MailQueueItemView(Mail mail, Optional<ZonedDateTime> 
nextDelivery) {
+        public DefaultMailQueueItemView(Mail mail, Optional<ZonedDateTime> 
nextDelivery) {
             this.mail = mail;
             this.nextDelivery = nextDelivery;
         }
diff --git 
a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java
 
b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java
index d32968e..cb69709 100644
--- 
a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java
+++ 
b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java
@@ -477,7 +477,7 @@ public class FileCacheableMailQueue implements 
ManageableMailQueue {
                 while (items.hasNext()) {
                     try (ObjectInputStream in = new ObjectInputStream(new 
FileInputStream(items.next().getObjectFile()))) {
                         final Mail mail = (Mail) in.readObject();
-                        item = new MailQueueItemView(mail, 
getNextDelivery(mail));
+                        item = new DefaultMailQueueItemView(mail, 
getNextDelivery(mail));
                         return true;
                     } catch (IOException | ClassNotFoundException e) {
                         LOGGER.info("Unable to load mail", e);
diff --git 
a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
 
b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
index 4e7cee9..1084b17 100644
--- 
a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
+++ 
b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
@@ -647,7 +647,7 @@ public class JMSCacheableMailQueue implements 
ManageableMailQueue, JMSSupport, M
                     while (hasNext()) {
                         try {
                             Message m = messages.nextElement();
-                            return new MailQueueItemView(createMail(m), 
nextDeliveryDate(m));
+                            return new DefaultMailQueueItemView(createMail(m), 
nextDeliveryDate(m));
                         } catch (MessagingException | JMSException e) {
                             LOGGER.error("Unable to browse queue", e);
                         }
diff --git 
a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
 
b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
index da9e8b1..93f12ca 100644
--- 
a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
+++ 
b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
@@ -221,9 +221,9 @@ public class MemoryMailQueueFactory implements 
MailQueueFactory<MemoryMailQueueF
 
         @Override
         public MailQueueIterator browse() throws MailQueueException {
-            Iterator<MailQueueItemView> underlying = 
ImmutableList.copyOf(mailItems)
+            Iterator<DefaultMailQueueItemView> underlying = 
ImmutableList.copyOf(mailItems)
                 .stream()
-                .map(item -> new MailQueueItemView(item.getMail(), 
item.delivery))
+                .map(item -> new DefaultMailQueueItemView(item.getMail(), 
item.delivery))
                 .iterator();
 
             return new MailQueueIterator() {
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index fe209f9..b477335 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -32,6 +32,7 @@ import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
+import 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
 import org.apache.mailet.Mail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,13 +82,13 @@ class Dequeuer implements Closeable {
     private final MailLoader mailLoader;
     private final Metric dequeueMetric;
     private final MailReferenceSerializer mailReferenceSerializer;
-    private final MailQueueView mailQueueView;
+    private final 
MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> 
mailQueueView;
     private final Receiver receiver;
     private final Flux<AcknowledgableDelivery> flux;
 
     Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, MailLoader 
mailLoader,
              MailReferenceSerializer serializer, MetricFactory metricFactory,
-             MailQueueView mailQueueView, MailQueueFactory.PrefetchCount 
prefetchCount) {
+             
MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> 
mailQueueView, MailQueueFactory.PrefetchCount prefetchCount) {
         this.mailLoader = mailLoader;
         this.mailReferenceSerializer = serializer;
         this.mailQueueView = mailQueueView;
@@ -162,5 +163,4 @@ class Dequeuer implements Closeable {
                 return Mono.empty();
             });
     }
-
 }
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index c134798..af30c14 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -34,6 +34,7 @@ import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
+import 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
 import org.apache.mailet.Mail;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -76,6 +77,13 @@ class Enqueuer {
             .block();
     }
 
+    Mono<Void> reQueue(CassandraMailQueueBrowser.CassandraMailQueueItemView 
item) {
+        Mail mail = item.getMail();
+        return Mono.fromCallable(() -> new MailReference(item.getEnqueuedId(), 
mail, item.getEnqueuedPartsId()))
+            
.flatMap(Throwing.function(this::publishReferenceToRabbit).sneakyThrow())
+            .then();
+    }
+
     private Mono<MimeMessagePartsId> saveMail(Mail mail) throws 
MailQueue.MailQueueException {
         try {
             return mimeMessageStore.save(mail.getMessage());
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index 3ed3ba9..5e838db 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -20,20 +20,24 @@
 package org.apache.james.queue.rabbitmq;
 
 import java.time.Duration;
+import java.time.Instant;
 
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
 import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
+import 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
 import org.apache.mailet.Mail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
+import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
 public class RabbitMQMailQueue implements ManageableMailQueue {
 
@@ -43,12 +47,12 @@ public class RabbitMQMailQueue implements 
ManageableMailQueue {
     private final MetricFactory metricFactory;
     private final Enqueuer enqueuer;
     private final Dequeuer dequeuer;
-    private final MailQueueView mailQueueView;
+    private final 
MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> 
mailQueueView;
     private final MailQueueItemDecoratorFactory decoratorFactory;
 
     RabbitMQMailQueue(MetricFactory metricFactory, MailQueueName name,
                       Enqueuer enqueuer, Dequeuer dequeuer,
-                      MailQueueView mailQueueView, 
MailQueueItemDecoratorFactory decoratorFactory) {
+                      
MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> 
mailQueueView, MailQueueItemDecoratorFactory decoratorFactory) {
         this.metricFactory = metricFactory;
         this.name = name;
         this.enqueuer = enqueuer;
@@ -119,4 +123,13 @@ public class RabbitMQMailQueue implements 
ManageableMailQueue {
             .add("name", name)
             .toString();
     }
+
+    public Flux<String> republishNotProcessedMails(Instant olderThan) {
+        Function<CassandraMailQueueBrowser.CassandraMailQueueItemView, 
Mono<String>> requeue = item ->
+            enqueuer.reQueue(item)
+                .thenReturn(item.getMail().getName());
+
+        return mailQueueView.browseOlderThanReactive(olderThan)
+            .flatMap(requeue);
+    }
 }
\ No newline at end of file
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
index 33eec80..921a08b 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java
@@ -19,14 +19,17 @@
 
 package org.apache.james.queue.rabbitmq.view.api;
 
+import java.time.Instant;
+
 import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.queue.rabbitmq.EnqueueId;
 import org.apache.james.queue.rabbitmq.EnqueuedItem;
 import org.apache.james.queue.rabbitmq.MailQueueName;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-public interface MailQueueView {
+public interface MailQueueView<V extends 
ManageableMailQueue.MailQueueItemView> {
 
     interface Factory {
         MailQueueView create(MailQueueName mailQueueName);
@@ -42,5 +45,9 @@ public interface MailQueueView {
 
     ManageableMailQueue.MailQueueIterator browse();
 
+    Flux<V> browseReactive();
+
+    Flux<V> browseOlderThanReactive(Instant olderThan);
+
     long getSize();
 }
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
index 91223ad..455025c 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java
@@ -24,17 +24,21 @@ import static 
org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlice
 
 import java.time.Clock;
 import java.time.Instant;
+import java.time.ZonedDateTime;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.Optional;
 
 import javax.inject.Inject;
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.blob.api.Store;
 import org.apache.james.blob.mail.MimeMessagePartsId;
 import org.apache.james.blob.mail.MimeMessageStore;
 import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.james.queue.rabbitmq.EnqueueId;
 import org.apache.james.queue.rabbitmq.EnqueuedItem;
 import org.apache.james.queue.rabbitmq.MailQueueName;
 import 
org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration;
@@ -52,9 +56,9 @@ public class CassandraMailQueueBrowser {
 
     static class CassandraMailQueueIterator implements 
ManageableMailQueue.MailQueueIterator {
 
-        private final Iterator<ManageableMailQueue.MailQueueItemView> iterator;
+        private final Iterator<CassandraMailQueueItemView> iterator;
 
-        
CassandraMailQueueIterator(Iterator<ManageableMailQueue.MailQueueItemView> 
iterator) {
+        CassandraMailQueueIterator(Iterator<CassandraMailQueueItemView> 
iterator) {
             Preconditions.checkNotNull(iterator);
 
             this.iterator = iterator;
@@ -71,7 +75,7 @@ public class CassandraMailQueueBrowser {
         }
 
         @Override
-        public ManageableMailQueue.MailQueueItemView next() {
+        public CassandraMailQueueItemView next() {
             return iterator.next();
         }
     }
@@ -100,10 +104,24 @@ public class CassandraMailQueueBrowser {
         this.clock = clock;
     }
 
-    Flux<ManageableMailQueue.MailQueueItemView> browse(MailQueueName 
queueName) {
+    Flux<CassandraMailQueueItemView> browse(MailQueueName queueName) {
         return browseReferences(queueName)
             .flatMapSequential(this::toMailFuture)
-            .map(ManageableMailQueue.MailQueueItemView::new);
+            .map(CassandraMailQueueItemView::new);
+    }
+
+    Flux<CassandraMailQueueItemView> browseOlderThan(MailQueueName queueName, 
Instant olderThan) {
+        return browseReferencesOlderThan(queueName, olderThan)
+            .flatMapSequential(this::toMailFuture)
+            .map(CassandraMailQueueItemView::new);
+    }
+
+    Flux<EnqueuedItemWithSlicingContext> 
browseReferencesOlderThan(MailQueueName queueName, Instant olderThan) {
+        return browseStartDao.findBrowseStart(queueName)
+            .flatMapMany(this::allSlicesStartingAt)
+            .filter(slice -> slice.getStartSliceInstant().isBefore(olderThan))
+            .flatMapSequential(slice -> browseSlice(queueName, slice))
+            .filter(item -> 
item.getEnqueuedItem().getEnqueuedTime().isBefore(olderThan));
     }
 
     Flux<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName 
queueName) {
@@ -112,10 +130,10 @@ public class CassandraMailQueueBrowser {
             .flatMapSequential(slice -> browseSlice(queueName, slice));
     }
 
-    private Mono<Mail> toMailFuture(EnqueuedItemWithSlicingContext 
enqueuedItemWithSlicingContext) {
+    private Mono<Pair<EnqueuedItem, Mail>> 
toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) {
         EnqueuedItem enqueuedItem = 
enqueuedItemWithSlicingContext.getEnqueuedItem();
         return mimeMessageStore.read(enqueuedItem.getPartsId())
-            .map(mimeMessage -> toMail(enqueuedItem, mimeMessage));
+            .map(mimeMessage -> Pair.of(enqueuedItem, toMail(enqueuedItem, 
mimeMessage)));
     }
 
     private Mail toMail(EnqueuedItem enqueuedItem, MimeMessage mimeMessage) {
@@ -151,4 +169,36 @@ public class CassandraMailQueueBrowser {
             .range(0, configuration.getBucketCount())
             .map(BucketId::of);
     }
+
+    public static class CassandraMailQueueItemView implements 
ManageableMailQueue.MailQueueItemView {
+        private final EnqueuedItem enqueuedItem;
+        private final Mail mail;
+
+        public CassandraMailQueueItemView(Pair<EnqueuedItem, Mail> pair) {
+            this(pair.getLeft(), pair.getRight());
+        }
+
+        public CassandraMailQueueItemView(EnqueuedItem enqueuedItem, Mail 
mail) {
+            this.enqueuedItem = enqueuedItem;
+            this.mail = mail;
+        }
+
+        public EnqueueId getEnqueuedId() {
+            return enqueuedItem.getEnqueueId();
+        }
+
+        public MimeMessagePartsId getEnqueuedPartsId() {
+            return enqueuedItem.getPartsId();
+        }
+
+        @Override
+        public Mail getMail() {
+            return mail;
+        }
+
+        @Override
+        public Optional<ZonedDateTime> getNextDelivery() {
+            return Optional.empty();
+        }
+    }
 }
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
index b82de97..72b26a3 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java
@@ -21,6 +21,8 @@ package org.apache.james.queue.rabbitmq.view.cassandra;
 
 import static org.apache.james.util.FunctionalUtils.negate;
 
+import java.time.Instant;
+
 import javax.inject.Inject;
 
 import org.apache.james.queue.api.ManageableMailQueue;
@@ -33,10 +35,11 @@ import 
org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMai
 import 
org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement;
 import 
org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
 
-public class CassandraMailQueueView implements MailQueueView {
+public class CassandraMailQueueView implements 
MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> {
 
     public static class Factory implements MailQueueView.Factory {
         private final CassandraMailQueueMailStore storeHelper;
@@ -91,13 +94,24 @@ public class CassandraMailQueueView implements 
MailQueueView {
     @Override
     public ManageableMailQueue.MailQueueIterator browse() {
         return new CassandraMailQueueBrowser.CassandraMailQueueIterator(
-            cassandraMailQueueBrowser.browse(mailQueueName)
-                .subscribeOn(Schedulers.elastic())
+            browseReactive()
                 .toIterable()
                 .iterator());
     }
 
     @Override
+    public Flux<CassandraMailQueueBrowser.CassandraMailQueueItemView> 
browseReactive() {
+        return cassandraMailQueueBrowser.browse(mailQueueName)
+            .subscribeOn(Schedulers.elastic());
+    }
+
+    @Override
+    public Flux<CassandraMailQueueBrowser.CassandraMailQueueItemView> 
browseOlderThanReactive(Instant olderThan) {
+        return cassandraMailQueueBrowser.browseOlderThan(mailQueueName, 
olderThan)
+            .subscribeOn(Schedulers.elastic());
+    }
+
+    @Override
     public long getSize() {
         return cassandraMailQueueBrowser.browseReferences(mailQueueName)
             .count()
@@ -133,6 +147,6 @@ public class CassandraMailQueueView implements 
MailQueueView {
     @Override
     public Mono<Boolean> isPresent(EnqueueId id) {
         return cassandraMailQueueMailDelete.isDeleted(id, mailQueueName)
-                .map(negate());
+            .map(negate());
     }
 }
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index bf2d209..8808bfc 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -23,6 +23,7 @@ import static java.time.temporal.ChronoUnit.HOURS;
 import static 
org.apache.james.backends.cassandra.Scenario.Builder.executeNormally;
 import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
 import static org.apache.james.backends.cassandra.Scenario.Builder.returnEmpty;
+import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
 import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
@@ -75,11 +76,12 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.mockito.ArgumentCaptor;
 
 import com.github.fge.lambdas.Throwing;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.Sender;
 
 class RabbitMQMailQueueTest {
     private static final HashBlobId.Factory BLOB_ID_FACTORY = new 
HashBlobId.Factory();
@@ -132,7 +134,7 @@ class RabbitMQMailQueueTest {
         }
 
         @Override
-        public MailQueue getMailQueue() {
+        public RabbitMQMailQueue getMailQueue() {
             return mailQueue;
         }
 
@@ -288,6 +290,177 @@ class RabbitMQMailQueueTest {
                 .containsExactly(name1, name2, name3);
         }
 
+        @Test
+        void 
messagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessed() throws 
Exception {
+            clock.setInstant(Instant.now().minus(Duration.ofHours(2)));
+            String name1 = "myMail1";
+            String name2 = "myMail2";
+            String name3 = "myMail3";
+            Flux<MailQueue.MailQueueItem> dequeueFlux = 
Flux.from(getMailQueue().deQueue());
+
+            // Avoid early processing and prefetching
+            Sender sender = rabbitMQExtension.getSender();
+
+            suspendDequeuing(sender);
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name1)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name2)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name3)
+                .build());
+
+            resumeDequeuing(sender);
+            assertThat(getMailQueue()
+                    
.republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1)))
+                    .collectList()
+                    .block())
+                .containsExactlyInAnyOrder(name1, name2, name3);
+
+            List<MailQueue.MailQueueItem> items = 
dequeueFlux.take(Duration.ofSeconds(10)).collectList().block();
+
+            assertThat(items)
+                .extracting(item -> item.getMail().getName())
+                .containsExactlyInAnyOrder(name1, name2, name3);
+        }
+
+        @Test
+        void 
onlyOldMessagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessed() 
throws Exception {
+            clock.setInstant(Instant.now().minus(Duration.ofHours(2)));
+            String name1 = "myMail1";
+            String name2 = "myMail2";
+            String name3 = "myMail3";
+            Flux<MailQueue.MailQueueItem> dequeueFlux = 
Flux.from(getMailQueue().deQueue());
+
+            // Avoid early processing and prefetching
+            Sender sender = rabbitMQExtension.getSender();
+
+            suspendDequeuing(sender);
+
+            getMailQueue().enQueue(defaultMail()
+                    .name(name1)
+                    .build());
+
+            getMailQueue().enQueue(defaultMail()
+                    .name(name2)
+                    .build());
+
+            clock.setInstant(Instant.now());
+            getMailQueue().enQueue(defaultMail()
+                    .name(name3)
+                    .build());
+
+            resumeDequeuing(sender);
+            assertThat(getMailQueue()
+                    
.republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1)))
+                    .collectList()
+                    .block())
+                .containsExactlyInAnyOrder(name1, name2);
+
+            List<MailQueue.MailQueueItem> items = 
dequeueFlux.take(Duration.ofSeconds(10)).collectList().block();
+
+            assertThat(items)
+                    .extracting(item -> item.getMail().getName())
+                    .containsExactlyInAnyOrder(name1, name2);
+        }
+
+        @Test
+        void messagesShouldBeProcessedAfterTwoMailsReprocessing() throws 
Exception {
+            clock.setInstant(Instant.now().minus(Duration.ofHours(2)));
+            String name1 = "myMail1";
+            String name2 = "myMail2";
+            String name3 = "myMail3";
+            Flux<MailQueue.MailQueueItem> dequeueFlux = 
Flux.from(getMailQueue().deQueue());
+
+            // Avoid early processing and prefetching
+            Sender sender = rabbitMQExtension.getSender();
+
+            suspendDequeuing(sender);
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name1)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name2)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name3)
+                .build());
+
+            assertThat(getMailQueue()
+                    
.republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1)))
+                    .collectList()
+                    .block())
+                .containsExactlyInAnyOrder(name1, name2, name3);
+            resumeDequeuing(sender);
+            assertThat(getMailQueue()
+                    
.republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1)))
+                    .collectList()
+                    .block())
+                .containsExactlyInAnyOrder(name1, name2, name3);
+
+            List<MailQueue.MailQueueItem> items = 
dequeueFlux.take(Duration.ofSeconds(10)).collectList().block();
+
+            assertThat(items)
+                .extracting(item -> item.getMail().getName())
+                .containsExactlyInAnyOrder(name1, name2, name3);
+        }
+
+        @Test
+        void 
messagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessedAndNewMessagesShouldNotBeLost()
 throws Exception {
+            clock.setInstant(Instant.now().minus(Duration.ofHours(2)));
+            String name1 = "myMail1";
+            String name2 = "myMail2";
+            String name3 = "myMail3";
+            Flux<MailQueue.MailQueueItem> dequeueFlux = 
Flux.from(getMailQueue().deQueue());
+
+            // Avoid early processing and prefetching
+            Sender sender = rabbitMQExtension.getSender();
+
+            suspendDequeuing(sender);
+            //mail send when rabbit down
+            getMailQueue().enQueue(defaultMail()
+                .name(name1)
+                .build());
+            resumeDequeuing(sender);
+
+            //mail send when rabbit is up again and before rebuild
+            clock.setInstant(Instant.now());
+            getMailQueue().enQueue(defaultMail()
+                .name(name3)
+                .build());
+
+            Flux.merge(Mono.fromCallable(() -> {
+                //mail send concurently with rebuild
+                getMailQueue().enQueue(defaultMail()
+                    .name(name2)
+                    .build());
+                return true;
+
+            }), Mono.fromRunnable(() ->
+                assertThat(getMailQueue()
+                        
.republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1)))
+                        .collectList()
+                        .block())
+                    .containsOnly(name1)
+            ))
+            .then()
+            .block(Duration.ofSeconds(10));
+
+            List<MailQueue.MailQueueItem> items = 
dequeueFlux.take(Duration.ofSeconds(10)).collectList().block();
+
+            assertThat(items)
+                .extracting(item -> item.getMail().getName())
+                .containsExactlyInAnyOrder(name1, name2, name3);
+        }
+
         private void enqueueSomeMails(Function<Integer, String> namePattern, 
int emailCount) {
             IntStream.rangeClosed(1, emailCount)
                 .forEach(Throwing.intConsumer(i -> enQueue(defaultMail()
@@ -485,6 +658,22 @@ class RabbitMQMailQueueTest {
             Awaitility.await().atMost(org.awaitility.Duration.TEN_SECONDS)
                 .untilAsserted(() -> 
assertThat(deadLetteredCount.get()).isEqualTo(1));
         }
+
+        private void resumeDequeuing(Sender sender) {
+            sender.bindQueue(getMailQueueBindingSpecification()).block();
+        }
+
+        private void suspendDequeuing(Sender sender) {
+            sender.unbindQueue(getMailQueueBindingSpecification()).block();
+        }
+
+        private BindingSpecification getMailQueueBindingSpecification() {
+            MailQueueName mailQueueName = 
MailQueueName.fromString(getMailQueue().getName().asString());
+            return BindingSpecification.binding()
+                    .exchange(mailQueueName.toRabbitExchangeName().asString())
+                    .queue(mailQueueName.toWorkQueueName().asString())
+                    .routingKey(EMPTY_ROUTING_KEY);
+        }
     }
 
     @Nested
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
index c40fb11..2917b35 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java
@@ -35,6 +35,7 @@ import org.apache.james.queue.api.MailQueueFactoryContract;
 import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory;
 import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
+import 
org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -52,7 +53,7 @@ class RabbitMqMailQueueFactoryTest implements 
MailQueueFactoryContract<RabbitMQM
     void setup() throws Exception {
         MimeMessageStore.Factory mimeMessageStoreFactory = 
mock(MimeMessageStore.Factory.class);
         MailQueueView.Factory mailQueueViewFactory = 
mock(MailQueueView.Factory.class);
-        MailQueueView mailQueueView = mock(MailQueueView.class);
+        MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> 
mailQueueView = mock(MailQueueView.class);
         when(mailQueueViewFactory.create(any()))
             .thenReturn(mailQueueView);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to