JAMES-2544 Move enqueuedTime to MailQueueView level for storing mails
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/6be6b165 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/6be6b165 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/6be6b165 Branch: refs/heads/master Commit: 6be6b16502634d5d6e02438ef9464f719eaa041b Parents: 3a53806 Author: duc <dt...@linagora.com> Authored: Mon Sep 17 14:55:26 2018 +0700 Committer: Benoit Tellier <btell...@linagora.com> Committed: Wed Sep 26 09:22:06 2018 +0700 ---------------------------------------------------------------------- .../apache/james/queue/rabbitmq/Dequeuer.java | 12 +++++-- .../apache/james/queue/rabbitmq/Enqueuer.java | 10 +++++- .../james/queue/rabbitmq/RabbitMQMailQueue.java | 26 ++++++++------ .../queue/rabbitmq/view/api/MailQueueView.java | 3 +- .../cassandra/CassandraMailQueueMailStore.java | 8 ++--- .../view/cassandra/CassandraMailQueueView.java | 5 +-- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 36 +++++--------------- .../rabbitmq/RabbitMqMailQueueFactoryTest.java | 3 +- 8 files changed, 53 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/6be6b165/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index 7b81bb6..81f4c61 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -30,6 +30,7 @@ import java.util.function.Function; import org.apache.james.metrics.api.Metric; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueue; +import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import org.apache.mailet.Mail; import com.github.fge.lambdas.Throwing; @@ -56,7 +57,7 @@ class Dequeuer { } @Override - public void done(boolean success) throws MailQueue.MailQueueException { + public void done(boolean success) { ack.accept(success); } } @@ -68,16 +69,20 @@ class Dequeuer { private final Function<MailReferenceDTO, Mail> mailLoader; private final Metric dequeueMetric; private final MailReferenceSerializer mailReferenceSerializer; + private final MailQueueView mailQueueView; - Dequeuer(MailQueueName name, RabbitClient rabbitClient, Function<MailReferenceDTO, Mail> mailLoader, MailReferenceSerializer serializer, MetricFactory metricFactory) { + Dequeuer(MailQueueName name, RabbitClient rabbitClient, Function<MailReferenceDTO, Mail> mailLoader, + MailReferenceSerializer serializer, MetricFactory metricFactory, + MailQueueView mailQueueView) { this.name = name; this.rabbitClient = rabbitClient; this.mailLoader = mailLoader; this.mailReferenceSerializer = serializer; + this.mailQueueView = mailQueueView; this.dequeueMetric = metricFactory.generate(DEQUEUED_METRIC_NAME_PREFIX + name.asString()); } - MailQueue.MailQueueItem deQueue() throws MailQueue.MailQueueException { + MailQueue.MailQueueItem deQueue() { return pollChannel() .thenApply(Throwing.function(this::loadItem).sneakyThrow()) .join(); @@ -95,6 +100,7 @@ class Dequeuer { if (success) { dequeueMetric.increment(); rabbitClient.ack(deliveryTag); + mailQueueView.deleteMail(mail).join(); } else { rabbitClient.nack(deliveryTag); } http://git-wip-us.apache.org/repos/asf/james-project/blob/6be6b165/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java index 84d4921..f1d4ca4 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java @@ -21,6 +21,7 @@ package org.apache.james.queue.rabbitmq; import static org.apache.james.queue.api.MailQueue.ENQUEUED_METRIC_NAME_PREFIX; +import java.time.Clock; import java.util.concurrent.CompletableFuture; import javax.mail.MessagingException; @@ -31,6 +32,7 @@ import org.apache.james.blob.mail.MimeMessagePartsId; import org.apache.james.metrics.api.Metric; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.queue.api.MailQueue; +import org.apache.james.queue.rabbitmq.view.api.MailQueueView; import org.apache.mailet.Mail; import com.fasterxml.jackson.core.JsonProcessingException; @@ -42,13 +44,18 @@ class Enqueuer { private final Store<MimeMessage, MimeMessagePartsId> mimeMessageStore; private final MailReferenceSerializer mailReferenceSerializer; private final Metric enqueueMetric; + private final MailQueueView mailQueueView; + private final Clock clock; Enqueuer(MailQueueName name, RabbitClient rabbitClient, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, - MailReferenceSerializer serializer, MetricFactory metricFactory) { + MailReferenceSerializer serializer, MetricFactory metricFactory, + MailQueueView mailQueueView, Clock clock) { this.name = name; this.rabbitClient = rabbitClient; this.mimeMessageStore = mimeMessageStore; this.mailReferenceSerializer = serializer; + this.mailQueueView = mailQueueView; + this.clock = clock; this.enqueueMetric = metricFactory.generate(ENQUEUED_METRIC_NAME_PREFIX + name.asString()); } @@ -56,6 +63,7 @@ class Enqueuer { saveMail(mail) .thenAccept(Throwing.<MimeMessagePartsId>consumer(partsId -> publishReferenceToRabbit(mail, partsId)).sneakyThrow()) .thenRun(enqueueMetric::increment) + .thenCompose(any -> mailQueueView.storeMail(clock.instant(), mail)) .join(); } http://git-wip-us.apache.org/repos/asf/james-project/blob/6be6b165/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java index 818a83b..c653943 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java @@ -19,6 +19,7 @@ package org.apache.james.queue.rabbitmq; +import java.time.Clock; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -50,25 +51,29 @@ public class RabbitMQMailQueue implements ManageableMailQueue { private final MailReferenceSerializer mailReferenceSerializer; private final Function<MailReferenceDTO, Mail> mailLoader; private final MailQueueView mailQueueView; + private final Clock clock; @Inject @VisibleForTesting Factory(MetricFactory metricFactory, RabbitClient rabbitClient, Store<MimeMessage, MimeMessagePartsId> mimeMessageStore, BlobId.Factory blobIdFactory, - MailQueueView mailQueueView) { + MailQueueView mailQueueView, + Clock clock) { this.metricFactory = metricFactory; this.rabbitClient = rabbitClient; this.mimeMessageStore = mimeMessageStore; + this.mailQueueView = mailQueueView; + this.clock = clock; this.mailReferenceSerializer = new MailReferenceSerializer(); this.mailLoader = Throwing.function(new MailLoader(mimeMessageStore, blobIdFactory)::load).sneakyThrow(); - this.mailQueueView = mailQueueView; } RabbitMQMailQueue create(MailQueueName mailQueueName) { return new RabbitMQMailQueue(metricFactory, mailQueueName, - new Enqueuer(mailQueueName, rabbitClient, mimeMessageStore, mailReferenceSerializer, metricFactory), - new Dequeuer(mailQueueName, rabbitClient, mailLoader, mailReferenceSerializer, metricFactory), - mailQueueView); + new Enqueuer(mailQueueName, rabbitClient, mimeMessageStore, mailReferenceSerializer, + metricFactory, mailQueueView, clock), + new Dequeuer(mailQueueName, rabbitClient, mailLoader, mailReferenceSerializer, + metricFactory, mailQueueView), mailQueueView); } } @@ -79,13 +84,12 @@ public class RabbitMQMailQueue implements ManageableMailQueue { private final MailQueueView mailQueueView; RabbitMQMailQueue(MetricFactory metricFactory, MailQueueName name, - Enqueuer enqueuer, Dequeuer dequeuer, MailQueueView mailQueueView) { - + Enqueuer enqueuer, Dequeuer dequeuer, + MailQueueView mailQueueView) { + this.metricFactory = metricFactory; this.name = name; this.enqueuer = enqueuer; this.dequeuer = dequeuer; - - this.metricFactory = metricFactory; this.mailQueueView = mailQueueView; } @@ -103,13 +107,13 @@ public class RabbitMQMailQueue implements ManageableMailQueue { } @Override - public void enQueue(Mail mail) throws MailQueueException { + public void enQueue(Mail mail) { metricFactory.runPublishingTimerMetric(ENQUEUED_TIMER_METRIC_NAME_PREFIX + name.asString(), Throwing.runnable(() -> enqueuer.enQueue(mail)).sneakyThrow()); } @Override - public MailQueueItem deQueue() throws MailQueueException { + public MailQueueItem deQueue() { return metricFactory.runPublishingTimerMetric(DEQUEUED_TIMER_METRIC_NAME_PREFIX + name.asString(), Throwing.supplier(dequeuer::deQueue).sneakyThrow()); } http://git-wip-us.apache.org/repos/asf/james-project/blob/6be6b165/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java index 0d70239..10ad42d 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/api/MailQueueView.java @@ -19,6 +19,7 @@ package org.apache.james.queue.rabbitmq.view.api; +import java.time.Instant; import java.util.concurrent.CompletableFuture; import org.apache.james.queue.api.ManageableMailQueue; @@ -26,7 +27,7 @@ import org.apache.mailet.Mail; public interface MailQueueView { - CompletableFuture<Void> storeMail(Mail mail); + CompletableFuture<Void> storeMail(Instant enqueuedTime, Mail mail); CompletableFuture<Void> deleteMail(Mail mail); http://git-wip-us.apache.org/repos/asf/james-project/blob/6be6b165/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java index ed696f2..a243407 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueMailStore.java @@ -53,8 +53,8 @@ class CassandraMailQueueMailStore { this.initialInserted = ConcurrentHashMap.newKeySet(); } - CompletableFuture<Void> storeMailInEnqueueTable(Mail mail, MailQueueName mailQueueName) { - EnqueuedMail enqueuedMail = convertToEnqueuedMail(mail, mailQueueName); + CompletableFuture<Void> storeMailInEnqueueTable(Mail mail, MailQueueName mailQueueName, Instant enqueuedTime) { + EnqueuedMail enqueuedMail = convertToEnqueuedMail(mail, mailQueueName, enqueuedTime); return enqueuedMailsDao.insert(enqueuedMail) .thenCompose(any -> initBrowseStartIfNeeded(mailQueueName, enqueuedMail.getTimeRangeStart())); @@ -73,12 +73,12 @@ class CassandraMailQueueMailStore { .thenAccept(any -> initialInserted.add(mailQueueName)); } - private EnqueuedMail convertToEnqueuedMail(Mail mail, MailQueueName mailQueueName) { + private EnqueuedMail convertToEnqueuedMail(Mail mail, MailQueueName mailQueueName, Instant enqueuedTime) { return EnqueuedMail.builder() .mail(mail) .bucketId(computedBucketId(mail)) .timeRangeStart(currentSliceStartInstant()) - .enqueuedTime(Instant.now()) + .enqueuedTime(enqueuedTime) .mailKey(MailKey.fromMail(mail)) .mailQueueName(mailQueueName) .build(); http://git-wip-us.apache.org/repos/asf/james-project/blob/6be6b165/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java index ee309e6..ad37b5b 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/CassandraMailQueueView.java @@ -19,6 +19,7 @@ package org.apache.james.queue.rabbitmq.view.cassandra; +import java.time.Instant; import java.util.concurrent.CompletableFuture; import javax.inject.Inject; @@ -66,8 +67,8 @@ public class CassandraMailQueueView implements MailQueueView { } @Override - public CompletableFuture<Void> storeMail(Mail mail) { - return storeHelper.storeMailInEnqueueTable(mail, mailQueueName); + public CompletableFuture<Void> storeMail(Instant enqueuedTime, Mail mail) { + return storeHelper.storeMailInEnqueueTable(mail, mailQueueName, enqueuedTime); } @Override http://git-wip-us.apache.org/repos/asf/james-project/blob/6be6b165/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index 92b384a..f1f31be 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -66,8 +66,6 @@ import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewModu import org.apache.james.queue.rabbitmq.view.cassandra.CassandraMailQueueViewTestFactory; import org.apache.james.util.streams.Iterators; import org.apache.mailet.Mail; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -90,7 +88,6 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQ private static final Instant IN_SLICE_5 = IN_SLICE_1.plus(4, HOURS); private static final Instant IN_SLICE_6 = IN_SLICE_1.plus(6, HOURS); - @RegisterExtension static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(CassandraModule.aggregateModules( CassandraBlobModule.MODULE, @@ -115,7 +112,7 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQ .setPort(rabbitMQ.getAdminPort()) .build(); clock = mock(Clock.class); - when(clock.instant()).thenReturn(Instant.parse("2007-12-03T10:15:30.00Z")); + when(clock.instant()).thenReturn(IN_SLICE_1); ThreadLocalRandom random = ThreadLocalRandom.current(); MailQueueView mailQueueView = CassandraMailQueueViewTestFactory.factory(clock, random, cassandra.getConf(), cassandra.getTypesProvider(), @@ -136,26 +133,16 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQ RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQConnectionFactory)); RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory( - metricTestSystem.getSpyMetricFactory(), - rabbitClient, - mimeMessageStore, - BLOB_ID_FACTORY, - mailQueueView); - + metricTestSystem.getSpyMetricFactory(), + rabbitClient, + mimeMessageStore, + BLOB_ID_FACTORY, + mailQueueView, + Clock.systemUTC()); RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest", "guest".toCharArray())); mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); } - @AfterEach - void tearDown(CassandraCluster cassandra) { - cassandra.clearTables(); - } - - @AfterAll - static void tearDownClass(CassandraCluster cassandra) { - cassandra.closeCluster(); - } - @Override public MailQueue getMailQueue() { return mailQueueFactory.createQueue(SPOOL); @@ -215,20 +202,15 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQ when(clock.instant()).thenReturn(IN_SLICE_6); dequeueMails(5); - - dequeueMails(3); - MailQueue.MailQueueItem item2_4 = mailQueue.deQueue(); - item2_4.done(false); - dequeueMails(1); - dequeueMails(5); + dequeueMails(3); Stream<String> names = Iterators.toStream(mailQueue.browse()) .map(ManageableMailQueue.MailQueueItemView::getMail) .map(Mail::getName); assertThat(names) - .containsExactly("2-4", "5-1", "5-2", "5-3", "5-4", "5-5"); + .containsExactly("3-4", "3-5", "5-1", "5-2", "5-3", "5-4", "5-5"); } @Disabled http://git-wip-us.apache.org/repos/asf/james-project/blob/6be6b165/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java index bf7d37e..99587dd 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.mock; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.time.Clock; import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; @@ -80,7 +81,7 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())); RabbitClient rabbitClient = new RabbitClient(new RabbitChannelPool(rabbitMQConnectionFactory)); - RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(new NoopMetricFactory(), rabbitClient, mimeMessageStore, BLOB_ID_FACTORY, mailQueueView); + RabbitMQMailQueue.Factory factory = new RabbitMQMailQueue.Factory(new NoopMetricFactory(), rabbitClient, mimeMessageStore, BLOB_ID_FACTORY, mailQueueView, Clock.systemUTC()); RabbitMQManagementApi mqManagementApi = new RabbitMQManagementApi(rabbitManagementUri, new RabbitMQManagementCredentials("guest", "guest".toCharArray())); mailQueueFactory = new RabbitMQMailQueueFactory(rabbitClient, mqManagementApi, factory); } --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org