This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 17f8fc5696b56c159a23d6d4abbde0d996756baf Author: Rémi Kowalski <rkowal...@linagora.com> AuthorDate: Mon Jul 6 15:16:48 2020 +0200 JAMES-3296 Add republishing to RabbitMQMailQueue from Cassandra capability --- .../james/webadmin/dto/MailQueueItemDTOTest.java | 3 +- .../james/queue/api/ManageableMailQueue.java | 18 +- .../james/queue/file/FileCacheableMailQueue.java | 2 +- .../james/queue/jms/JMSCacheableMailQueue.java | 2 +- .../james/queue/memory/MemoryMailQueueFactory.java | 4 +- .../org/apache/james/queue/rabbitmq/Dequeuer.java | 6 +- .../org/apache/james/queue/rabbitmq/Enqueuer.java | 8 + .../james/queue/rabbitmq/RabbitMQMailQueue.java | 17 +- .../queue/rabbitmq/view/api/MailQueueView.java | 9 +- .../view/cassandra/CassandraMailQueueBrowser.java | 64 ++++++- .../view/cassandra/CassandraMailQueueView.java | 22 ++- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 193 ++++++++++++++++++++- .../rabbitmq/RabbitMqMailQueueFactoryTest.java | 3 +- 13 files changed, 322 insertions(+), 29 deletions(-) diff --git a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/dto/MailQueueItemDTOTest.java b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/dto/MailQueueItemDTOTest.java index 18b14d2..4b177de 100644 --- a/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/dto/MailQueueItemDTOTest.java +++ b/server/protocols/webadmin/webadmin-mailqueue/src/test/java/org/apache/james/webadmin/dto/MailQueueItemDTOTest.java @@ -25,6 +25,7 @@ import java.util.List; import org.apache.james.core.MailAddress; import org.apache.james.queue.api.Mails; +import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.queue.api.ManageableMailQueue.MailQueueItemView; import org.apache.mailet.base.test.FakeMail; import org.assertj.core.api.JUnitSoftAssertions; @@ -54,7 +55,7 @@ public class MailQueueItemDTOTest { public void fromShouldCreateTheRightObject() throws Exception { FakeMail mail = Mails.defaultMail().name("name").build(); ZonedDateTime date = ZonedDateTime.parse("2018-01-02T11:22:02Z"); - MailQueueItemView mailQueueItemView = new MailQueueItemView(mail, date); + MailQueueItemView mailQueueItemView = new ManageableMailQueue.DefaultMailQueueItemView(mail, date); MailQueueItemDTO mailQueueItemDTO = MailQueueItemDTO.from(mailQueueItemView); List<String> expectedRecipients = mail.getRecipients().stream() .map(MailAddress::asString) diff --git a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java index 20635a6..5309dbc 100644 --- a/server/queue/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java +++ b/server/queue/queue-api/src/main/java/org/apache/james/queue/api/ManageableMailQueue.java @@ -91,20 +91,30 @@ public interface ManageableMailQueue extends MailQueue { /** * Represent a View over a queue {@link MailQueue.MailQueueItem} */ - class MailQueueItemView { + interface MailQueueItemView { + Mail getMail(); + + Optional<ZonedDateTime> getNextDelivery(); + } + + + /** + * Represent a View over a queue {@link MailQueue.MailQueueItem} + */ + class DefaultMailQueueItemView implements MailQueueItemView { private final Mail mail; private final Optional<ZonedDateTime> nextDelivery; - public MailQueueItemView(Mail mail) { + public DefaultMailQueueItemView(Mail mail) { this(mail, Optional.empty()); } - public MailQueueItemView(Mail mail, ZonedDateTime nextDelivery) { + public DefaultMailQueueItemView(Mail mail, ZonedDateTime nextDelivery) { this(mail, Optional.of(nextDelivery)); } - public MailQueueItemView(Mail mail, Optional<ZonedDateTime> nextDelivery) { + public DefaultMailQueueItemView(Mail mail, Optional<ZonedDateTime> nextDelivery) { this.mail = mail; this.nextDelivery = nextDelivery; } diff --git a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java index d32968e..cb69709 100644 --- a/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java +++ b/server/queue/queue-file/src/main/java/org/apache/james/queue/file/FileCacheableMailQueue.java @@ -477,7 +477,7 @@ public class FileCacheableMailQueue implements ManageableMailQueue { while (items.hasNext()) { try (ObjectInputStream in = new ObjectInputStream(new FileInputStream(items.next().getObjectFile()))) { final Mail mail = (Mail) in.readObject(); - item = new MailQueueItemView(mail, getNextDelivery(mail)); + item = new DefaultMailQueueItemView(mail, getNextDelivery(mail)); return true; } catch (IOException | ClassNotFoundException e) { LOGGER.info("Unable to load mail", e); diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java index 4e7cee9..1084b17 100644 --- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java +++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java @@ -647,7 +647,7 @@ public class JMSCacheableMailQueue implements ManageableMailQueue, JMSSupport, M while (hasNext()) { try { Message m = messages.nextElement(); - return new MailQueueItemView(createMail(m), nextDeliveryDate(m)); + return new DefaultMailQueueItemView(createMail(m), nextDeliveryDate(m)); } catch (MessagingException | JMSException e) { LOGGER.error("Unable to browse queue", e); } diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java index da9e8b1..93f12ca 100644 --- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java +++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java @@ -221,9 +221,9 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF @Override public MailQueueIterator browse() throws MailQueueException { - Iterator<MailQueueItemView> underlying = ImmutableList.copyOf(mailItems) + Iterator<DefaultMailQueueItemView> underlying = ImmutableList.copyOf(mailItems) .stream() - .map(item -> new MailQueueItemView(item.getMail(), item.delivery)) + .map(item -> new DefaultMailQueueItemView(item.getMail(), item.delivery)) .iterator(); return new MailQueueIterator() { diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index fe209f9..b477335 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -32,6 +32,7 @@ import org.apache.james.queue.api.MailQueue; import org.apache.james.queue.api.MailQueueFactory; import org.apache.james.queue.rabbitmq.view.api.DeleteCondition; import org.apache.james.queue.rabbitmq.view.api.MailQueueView; +import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser; import org.apache.mailet.Mail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,13 +82,13 @@ class Dequeuer implements Closeable { private final MailLoader mailLoader; private final Metric dequeueMetric; private final MailReferenceSerializer mailReferenceSerializer; - private final MailQueueView mailQueueView; + private final MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> mailQueueView; private final Receiver receiver; private final Flux<AcknowledgableDelivery> flux; Dequeuer(MailQueueName name, ReceiverProvider receiverProvider, MailLoader mailLoader, MailReferenceSerializer serializer, MetricFactory metricFactory, - MailQueueView mailQueueView, MailQueueFactory.PrefetchCount prefetchCount) { + MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> mailQueueView, MailQueueFactory.PrefetchCount prefetchCount) { this.mailLoader = mailLoader; this.mailReferenceSerializer = serializer; this.mailQueueView = mailQueueView; @@ -162,5 +163,4 @@ class Dequeuer implements Closeable { return Mono.empty(); }); } - } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java index c134798..af30c14 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java @@ -34,6 +34,7 @@ import org.apache.james.metrics.api.Metric; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueue; import org.apache.james.queue.rabbitmq.view.api.MailQueueView; +import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser; import org.apache.mailet.Mail; import com.fasterxml.jackson.core.JsonProcessingException; @@ -76,6 +77,13 @@ class Enqueuer { .block(); } + Mono<Void> reQueue(CassandraMailQueueBrowser.CassandraMailQueueItemView item) { + Mail mail = item.getMail(); + return Mono.fromCallable(() -> new MailReference(item.getEnqueuedId(), mail, item.getEnqueuedPartsId())) + .flatMap(Throwing.function(this::publishReferenceToRabbit).sneakyThrow()) + .then(); + } + private Mono<MimeMessagePartsId> saveMail(Mail mail) throws MailQueue.MailQueueException { try { return mimeMessageStore.save(mail.getMessage()); diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java index 3ed3ba9..5e838db 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java @@ -20,20 +20,24 @@ package org.apache.james.queue.rabbitmq; import java.time.Duration; +import java.time.Instant; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueueItemDecoratorFactory; import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.queue.rabbitmq.view.api.DeleteCondition; import org.apache.james.queue.rabbitmq.view.api.MailQueueView; +import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser; import org.apache.mailet.Mail; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; +import com.google.common.base.Function; import com.google.common.base.MoreObjects; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; public class RabbitMQMailQueue implements ManageableMailQueue { @@ -43,12 +47,12 @@ public class RabbitMQMailQueue implements ManageableMailQueue { private final MetricFactory metricFactory; private final Enqueuer enqueuer; private final Dequeuer dequeuer; - private final MailQueueView mailQueueView; + private final MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> mailQueueView; private final MailQueueItemDecoratorFactory decoratorFactory; RabbitMQMailQueue(MetricFactory metricFactory, MailQueueName name, Enqueuer enqueuer, Dequeuer dequeuer, - MailQueueView mailQueueView, MailQueueItemDecoratorFactory decoratorFactory) { + MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> mailQueueView, MailQueueItemDecoratorFactory decoratorFactory) { this.metricFactory = metricFactory; this.name = name; this.enqueuer = enqueuer; @@ -119,4 +123,13 @@ public class RabbitMQMailQueue implements ManageableMailQueue { .add("name", name) .toString(); } + + public Flux<String> republishNotProcessedMails(Instant olderThan) { + Function<CassandraMailQueueBrowser.CassandraMailQueueItemView, Mono<String>> requeue = item -> + enqueuer.reQueue(item) + .thenReturn(item.getMail().getName()); + + return mailQueueView.browseOlderThanReactive(olderThan) + .flatMap(requeue); + } } \ No newline at end of file diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java index 33eec80..921a08b 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java @@ -19,14 +19,17 @@ package org.apache.james.queue.rabbitmq.view.api; +import java.time.Instant; + import org.apache.james.queue.api.ManageableMailQueue; import org.apache.james.queue.rabbitmq.EnqueueId; import org.apache.james.queue.rabbitmq.EnqueuedItem; import org.apache.james.queue.rabbitmq.MailQueueName; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public interface MailQueueView { +public interface MailQueueView<V extends ManageableMailQueue.MailQueueItemView> { interface Factory { MailQueueView create(MailQueueName mailQueueName); @@ -42,5 +45,9 @@ public interface MailQueueView { ManageableMailQueue.MailQueueIterator browse(); + Flux<V> browseReactive(); + + Flux<V> browseOlderThanReactive(Instant olderThan); + long getSize(); } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java index 91223ad..455025c 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueBrowser.java @@ -24,17 +24,21 @@ import static org.apache.james.queue.rabbitmq.view.cassandra.model.BucketedSlice import java.time.Clock; import java.time.Instant; +import java.time.ZonedDateTime; import java.util.Comparator; import java.util.Iterator; +import java.util.Optional; import javax.inject.Inject; import javax.mail.MessagingException; import javax.mail.internet.MimeMessage; +import org.apache.commons.lang3.tuple.Pair; import org.apache.james.blob.api.Store; import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.blob.mail.MimeMessageStore; import org.apache.james.queue.api.ManageableMailQueue; +import org.apache.james.queue.rabbitmq.EnqueueId; import org.apache.james.queue.rabbitmq.EnqueuedItem; import org.apache.james.queue.rabbitmq.MailQueueName; import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMailQueueViewConfiguration; @@ -52,9 +56,9 @@ public class CassandraMailQueueBrowser { static class CassandraMailQueueIterator implements ManageableMailQueue.MailQueueIterator { - private final Iterator<ManageableMailQueue.MailQueueItemView> iterator; + private final Iterator<CassandraMailQueueItemView> iterator; - CassandraMailQueueIterator(Iterator<ManageableMailQueue.MailQueueItemView> iterator) { + CassandraMailQueueIterator(Iterator<CassandraMailQueueItemView> iterator) { Preconditions.checkNotNull(iterator); this.iterator = iterator; @@ -71,7 +75,7 @@ public class CassandraMailQueueBrowser { } @Override - public ManageableMailQueue.MailQueueItemView next() { + public CassandraMailQueueItemView next() { return iterator.next(); } } @@ -100,10 +104,24 @@ public class CassandraMailQueueBrowser { this.clock = clock; } - Flux<ManageableMailQueue.MailQueueItemView> browse(MailQueueName queueName) { + Flux<CassandraMailQueueItemView> browse(MailQueueName queueName) { return browseReferences(queueName) .flatMapSequential(this::toMailFuture) - .map(ManageableMailQueue.MailQueueItemView::new); + .map(CassandraMailQueueItemView::new); + } + + Flux<CassandraMailQueueItemView> browseOlderThan(MailQueueName queueName, Instant olderThan) { + return browseReferencesOlderThan(queueName, olderThan) + .flatMapSequential(this::toMailFuture) + .map(CassandraMailQueueItemView::new); + } + + Flux<EnqueuedItemWithSlicingContext> browseReferencesOlderThan(MailQueueName queueName, Instant olderThan) { + return browseStartDao.findBrowseStart(queueName) + .flatMapMany(this::allSlicesStartingAt) + .filter(slice -> slice.getStartSliceInstant().isBefore(olderThan)) + .flatMapSequential(slice -> browseSlice(queueName, slice)) + .filter(item -> item.getEnqueuedItem().getEnqueuedTime().isBefore(olderThan)); } Flux<EnqueuedItemWithSlicingContext> browseReferences(MailQueueName queueName) { @@ -112,10 +130,10 @@ public class CassandraMailQueueBrowser { .flatMapSequential(slice -> browseSlice(queueName, slice)); } - private Mono<Mail> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) { + private Mono<Pair<EnqueuedItem, Mail>> toMailFuture(EnqueuedItemWithSlicingContext enqueuedItemWithSlicingContext) { EnqueuedItem enqueuedItem = enqueuedItemWithSlicingContext.getEnqueuedItem(); return mimeMessageStore.read(enqueuedItem.getPartsId()) - .map(mimeMessage -> toMail(enqueuedItem, mimeMessage)); + .map(mimeMessage -> Pair.of(enqueuedItem, toMail(enqueuedItem, mimeMessage))); } private Mail toMail(EnqueuedItem enqueuedItem, MimeMessage mimeMessage) { @@ -151,4 +169,36 @@ public class CassandraMailQueueBrowser { .range(0, configuration.getBucketCount()) .map(BucketId::of); } + + public static class CassandraMailQueueItemView implements ManageableMailQueue.MailQueueItemView { + private final EnqueuedItem enqueuedItem; + private final Mail mail; + + public CassandraMailQueueItemView(Pair<EnqueuedItem, Mail> pair) { + this(pair.getLeft(), pair.getRight()); + } + + public CassandraMailQueueItemView(EnqueuedItem enqueuedItem, Mail mail) { + this.enqueuedItem = enqueuedItem; + this.mail = mail; + } + + public EnqueueId getEnqueuedId() { + return enqueuedItem.getEnqueueId(); + } + + public MimeMessagePartsId getEnqueuedPartsId() { + return enqueuedItem.getPartsId(); + } + + @Override + public Mail getMail() { + return mail; + } + + @Override + public Optional<ZonedDateTime> getNextDelivery() { + return Optional.empty(); + } + } } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java index b82de97..72b26a3 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java @@ -21,6 +21,8 @@ package org.apache.james.queue.rabbitmq.view.cassandra; import static org.apache.james.util.FunctionalUtils.negate; +import java.time.Instant; + import javax.inject.Inject; import org.apache.james.queue.api.ManageableMailQueue; @@ -33,10 +35,11 @@ import org.apache.james.queue.rabbitmq.view.cassandra.configuration.CassandraMai import org.apache.james.queue.rabbitmq.view.cassandra.configuration.EventsourcingConfigurationManagement; import org.apache.james.queue.rabbitmq.view.cassandra.model.EnqueuedItemWithSlicingContext; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; -public class CassandraMailQueueView implements MailQueueView { +public class CassandraMailQueueView implements MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> { public static class Factory implements MailQueueView.Factory { private final CassandraMailQueueMailStore storeHelper; @@ -91,13 +94,24 @@ public class CassandraMailQueueView implements MailQueueView { @Override public ManageableMailQueue.MailQueueIterator browse() { return new CassandraMailQueueBrowser.CassandraMailQueueIterator( - cassandraMailQueueBrowser.browse(mailQueueName) - .subscribeOn(Schedulers.elastic()) + browseReactive() .toIterable() .iterator()); } @Override + public Flux<CassandraMailQueueBrowser.CassandraMailQueueItemView> browseReactive() { + return cassandraMailQueueBrowser.browse(mailQueueName) + .subscribeOn(Schedulers.elastic()); + } + + @Override + public Flux<CassandraMailQueueBrowser.CassandraMailQueueItemView> browseOlderThanReactive(Instant olderThan) { + return cassandraMailQueueBrowser.browseOlderThan(mailQueueName, olderThan) + .subscribeOn(Schedulers.elastic()); + } + + @Override public long getSize() { return cassandraMailQueueBrowser.browseReferences(mailQueueName) .count() @@ -133,6 +147,6 @@ public class CassandraMailQueueView implements MailQueueView { @Override public Mono<Boolean> isPresent(EnqueueId id) { return cassandraMailQueueMailDelete.isDeleted(id, mailQueueName) - .map(negate()); + .map(negate()); } } diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index bf2d209..8808bfc 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -23,6 +23,7 @@ import static java.time.temporal.ChronoUnit.HOURS; import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally; import static org.apache.james.backends.cassandra.Scenario.Builder.fail; import static org.apache.james.backends.cassandra.Scenario.Builder.returnEmpty; +import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY; import static org.apache.james.queue.api.Mails.defaultMail; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; @@ -75,11 +76,12 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.ArgumentCaptor; import com.github.fge.lambdas.Throwing; - import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import reactor.rabbitmq.BindingSpecification; import reactor.rabbitmq.OutboundMessage; +import reactor.rabbitmq.Sender; class RabbitMQMailQueueTest { private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); @@ -132,7 +134,7 @@ class RabbitMQMailQueueTest { } @Override - public MailQueue getMailQueue() { + public RabbitMQMailQueue getMailQueue() { return mailQueue; } @@ -288,6 +290,177 @@ class RabbitMQMailQueueTest { .containsExactly(name1, name2, name3); } + @Test + void messagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessed() throws Exception { + clock.setInstant(Instant.now().minus(Duration.ofHours(2))); + String name1 = "myMail1"; + String name2 = "myMail2"; + String name3 = "myMail3"; + Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue()); + + // Avoid early processing and prefetching + Sender sender = rabbitMQExtension.getSender(); + + suspendDequeuing(sender); + + getMailQueue().enQueue(defaultMail() + .name(name1) + .build()); + + getMailQueue().enQueue(defaultMail() + .name(name2) + .build()); + + getMailQueue().enQueue(defaultMail() + .name(name3) + .build()); + + resumeDequeuing(sender); + assertThat(getMailQueue() + .republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1))) + .collectList() + .block()) + .containsExactlyInAnyOrder(name1, name2, name3); + + List<MailQueue.MailQueueItem> items = dequeueFlux.take(Duration.ofSeconds(10)).collectList().block(); + + assertThat(items) + .extracting(item -> item.getMail().getName()) + .containsExactlyInAnyOrder(name1, name2, name3); + } + + @Test + void onlyOldMessagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessed() throws Exception { + clock.setInstant(Instant.now().minus(Duration.ofHours(2))); + String name1 = "myMail1"; + String name2 = "myMail2"; + String name3 = "myMail3"; + Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue()); + + // Avoid early processing and prefetching + Sender sender = rabbitMQExtension.getSender(); + + suspendDequeuing(sender); + + getMailQueue().enQueue(defaultMail() + .name(name1) + .build()); + + getMailQueue().enQueue(defaultMail() + .name(name2) + .build()); + + clock.setInstant(Instant.now()); + getMailQueue().enQueue(defaultMail() + .name(name3) + .build()); + + resumeDequeuing(sender); + assertThat(getMailQueue() + .republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1))) + .collectList() + .block()) + .containsExactlyInAnyOrder(name1, name2); + + List<MailQueue.MailQueueItem> items = dequeueFlux.take(Duration.ofSeconds(10)).collectList().block(); + + assertThat(items) + .extracting(item -> item.getMail().getName()) + .containsExactlyInAnyOrder(name1, name2); + } + + @Test + void messagesShouldBeProcessedAfterTwoMailsReprocessing() throws Exception { + clock.setInstant(Instant.now().minus(Duration.ofHours(2))); + String name1 = "myMail1"; + String name2 = "myMail2"; + String name3 = "myMail3"; + Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue()); + + // Avoid early processing and prefetching + Sender sender = rabbitMQExtension.getSender(); + + suspendDequeuing(sender); + + getMailQueue().enQueue(defaultMail() + .name(name1) + .build()); + + getMailQueue().enQueue(defaultMail() + .name(name2) + .build()); + + getMailQueue().enQueue(defaultMail() + .name(name3) + .build()); + + assertThat(getMailQueue() + .republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1))) + .collectList() + .block()) + .containsExactlyInAnyOrder(name1, name2, name3); + resumeDequeuing(sender); + assertThat(getMailQueue() + .republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1))) + .collectList() + .block()) + .containsExactlyInAnyOrder(name1, name2, name3); + + List<MailQueue.MailQueueItem> items = dequeueFlux.take(Duration.ofSeconds(10)).collectList().block(); + + assertThat(items) + .extracting(item -> item.getMail().getName()) + .containsExactlyInAnyOrder(name1, name2, name3); + } + + @Test + void messagesShouldBeProcessedAfterNotPublishedMailsHaveBeenReprocessedAndNewMessagesShouldNotBeLost() throws Exception { + clock.setInstant(Instant.now().minus(Duration.ofHours(2))); + String name1 = "myMail1"; + String name2 = "myMail2"; + String name3 = "myMail3"; + Flux<MailQueue.MailQueueItem> dequeueFlux = Flux.from(getMailQueue().deQueue()); + + // Avoid early processing and prefetching + Sender sender = rabbitMQExtension.getSender(); + + suspendDequeuing(sender); + //mail send when rabbit down + getMailQueue().enQueue(defaultMail() + .name(name1) + .build()); + resumeDequeuing(sender); + + //mail send when rabbit is up again and before rebuild + clock.setInstant(Instant.now()); + getMailQueue().enQueue(defaultMail() + .name(name3) + .build()); + + Flux.merge(Mono.fromCallable(() -> { + //mail send concurently with rebuild + getMailQueue().enQueue(defaultMail() + .name(name2) + .build()); + return true; + + }), Mono.fromRunnable(() -> + assertThat(getMailQueue() + .republishNotProcessedMails(Instant.now().minus(Duration.ofHours(1))) + .collectList() + .block()) + .containsOnly(name1) + )) + .then() + .block(Duration.ofSeconds(10)); + + List<MailQueue.MailQueueItem> items = dequeueFlux.take(Duration.ofSeconds(10)).collectList().block(); + + assertThat(items) + .extracting(item -> item.getMail().getName()) + .containsExactlyInAnyOrder(name1, name2, name3); + } + private void enqueueSomeMails(Function<Integer, String> namePattern, int emailCount) { IntStream.rangeClosed(1, emailCount) .forEach(Throwing.intConsumer(i -> enQueue(defaultMail() @@ -485,6 +658,22 @@ class RabbitMQMailQueueTest { Awaitility.await().atMost(org.awaitility.Duration.TEN_SECONDS) .untilAsserted(() -> assertThat(deadLetteredCount.get()).isEqualTo(1)); } + + private void resumeDequeuing(Sender sender) { + sender.bindQueue(getMailQueueBindingSpecification()).block(); + } + + private void suspendDequeuing(Sender sender) { + sender.unbindQueue(getMailQueueBindingSpecification()).block(); + } + + private BindingSpecification getMailQueueBindingSpecification() { + MailQueueName mailQueueName = MailQueueName.fromString(getMailQueue().getName().asString()); + return BindingSpecification.binding() + .exchange(mailQueueName.toRabbitExchangeName().asString()) + .queue(mailQueueName.toWorkQueueName().asString()) + .routingKey(EMPTY_ROUTING_KEY); + } } @Nested diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java index c40fb11..2917b35 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java @@ -35,6 +35,7 @@ import org.apache.james.queue.api.MailQueueFactoryContract; import org.apache.james.queue.api.RawMailQueueItemDecoratorFactory; import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration; import org.apache.james.queue.rabbitmq.view.api.MailQueueView; +import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueBrowser; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; @@ -52,7 +53,7 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM void setup() throws Exception { MimeMessageStore.Factory mimeMessageStoreFactory = mock(MimeMessageStore.Factory.class); MailQueueView.Factory mailQueueViewFactory = mock(MailQueueView.Factory.class); - MailQueueView mailQueueView = mock(MailQueueView.class); + MailQueueView<CassandraMailQueueBrowser.CassandraMailQueueItemView> mailQueueView = mock(MailQueueView.class); when(mailQueueViewFactory.create(any())) .thenReturn(mailQueueView); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org