This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit aa10262880e8081d1b8b2650f015c970fafd9409 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Fri Apr 14 09:58:11 2023 +0700 JAMES-3822 MemoryCacheableMailQueue should use a clock to manage delays This enables writing tests on the delay logic. --- .../james/queue/memory/MemoryMailQueueFactory.java | 38 ++++++++++++++-------- .../queue/memory/MemoryCacheableMailQueueTest.java | 4 ++- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java index 73bf35f1d7..b11aaab97a 100644 --- a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java +++ b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java @@ -21,6 +21,7 @@ package org.apache.james.queue.memory; import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY; +import java.time.Clock; import java.time.DateTimeException; import java.time.Duration; import java.time.Instant; @@ -69,11 +70,17 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF private final ConcurrentHashMap<MailQueueName, MemoryCacheableMailQueue> mailQueues; private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory; + private final Clock clock; @Inject - public MemoryMailQueueFactory(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) { + public MemoryMailQueueFactory(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, Clock clock) { this.mailQueues = new ConcurrentHashMap<>(); this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory; + this.clock = clock; + } + + public MemoryMailQueueFactory(MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) { + this(mailQueueItemDecoratorFactory, Clock.systemUTC()); } @PreDestroy @@ -98,7 +105,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF @Override public MemoryCacheableMailQueue createQueue(MailQueueName name, PrefetchCount prefetchCount) { - MemoryCacheableMailQueue queue = mailQueues.computeIfAbsent(name, mailQueueName -> new MemoryCacheableMailQueue(mailQueueName, mailQueueItemDecoratorFactory)); + MemoryCacheableMailQueue queue = mailQueues.computeIfAbsent(name, mailQueueName -> new MemoryCacheableMailQueue(mailQueueName, mailQueueItemDecoratorFactory, clock)); queue.reference(); return queue; } @@ -110,8 +117,10 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF private final MailQueueName name; private final Flux<MailQueueItem> flux; private final Scheduler scheduler; + private final Clock clock; - public MemoryCacheableMailQueue(MailQueueName name, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) { + public MemoryCacheableMailQueue(MailQueueName name, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, Clock clock) { + this.clock = clock; this.mailItems = new DelayQueue<>(); this.inProcessingMailItems = new LinkedBlockingDeque<>(); this.name = name; @@ -120,6 +129,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF try { sink.success(mailItems.take()); } catch (InterruptedException e) { + sink.error(e); Thread.currentThread().interrupt(); } }) @@ -155,7 +165,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF public void enQueue(Mail mail, Duration delay) throws MailQueueException { ZonedDateTime nextDelivery = calculateNextDelivery(delay); try { - mailItems.put(new MemoryMailQueueItem(cloneMail(mail), this, nextDelivery)); + mailItems.put(new MemoryMailQueueItem(cloneMail(mail), this, clock, nextDelivery)); } catch (MessagingException e) { throw new MailQueueException("Error while copying mail " + mail.getName(), e); } @@ -169,13 +179,13 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF private ZonedDateTime calculateNextDelivery(Duration delay) { if (!delay.isNegative()) { try { - return ZonedDateTime.now().plus(delay); + return ZonedDateTime.now(clock).plus(delay); } catch (DateTimeException | ArithmeticException e) { return Instant.ofEpochMilli(Long.MAX_VALUE).atZone(ZoneId.of("UTC")); } } - return ZonedDateTime.now(); + return ZonedDateTime.now(clock); } @Override @@ -198,7 +208,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF return flux; } - public Mail getLastMail() throws MailQueueException, InterruptedException { + public Mail getLastMail() { MemoryMailQueueItem maybeItem = Iterables.getLast(mailItems, null); if (maybeItem == null) { return null; @@ -207,7 +217,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF } @Override - public long getSize() throws MailQueueException { + public long getSize() { return mailItems.size() + inProcessingMailItems.size(); } @@ -224,14 +234,14 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF } @Override - public long clear() throws MailQueueException { + public long clear() { int size = mailItems.size(); mailItems.clear(); return size; } @Override - public long remove(Type type, String value) throws MailQueueException { + public long remove(Type type, String value) { ImmutableList<MemoryMailQueueItem> toBeRemoved = mailItems.stream() .filter(item -> shouldRemove(item, type, value)) .collect(ImmutableList.toImmutableList()); @@ -261,7 +271,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF } @Override - public MailQueueIterator browse() throws MailQueueException { + public MailQueueIterator browse() { Iterator<DefaultMailQueueItemView> underlying = ImmutableList.copyOf(mailItems) .stream() .map(item -> new DefaultMailQueueItemView(item.getMail(), item.delivery)) @@ -305,11 +315,13 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF public static class MemoryMailQueueItem implements MailQueue.MailQueueItem, Delayed { private final Mail mail; private final MemoryCacheableMailQueue queue; + private final Clock clock; private final ZonedDateTime delivery; - public MemoryMailQueueItem(Mail mail, MemoryCacheableMailQueue queue, ZonedDateTime delivery) { + public MemoryMailQueueItem(Mail mail, MemoryCacheableMailQueue queue, Clock clock, ZonedDateTime delivery) { this.mail = mail; this.queue = queue; + this.clock = clock; this.delivery = delivery; } @@ -329,7 +341,7 @@ public class MemoryMailQueueFactory implements MailQueueFactory<MemoryMailQueueF @Override public long getDelay(TimeUnit unit) { try { - return ZonedDateTime.now().until(delivery, Temporals.chronoUnit(unit)); + return ZonedDateTime.now(clock).until(delivery, Temporals.chronoUnit(unit)); } catch (ArithmeticException e) { return Long.MAX_VALUE; } diff --git a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java index a2fb680ab6..218d28d954 100644 --- a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java +++ b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java @@ -22,6 +22,8 @@ package org.apache.james.queue.memory; import static org.apache.james.queue.api.Mails.defaultMail; import static org.assertj.core.api.Assertions.assertThat; +import java.time.Clock; + import org.apache.james.queue.api.DelayedManageableMailQueueContract; import org.apache.james.queue.api.MailQueue; import org.apache.james.queue.api.MailQueueName; @@ -37,7 +39,7 @@ public class MemoryCacheableMailQueueTest implements DelayedManageableMailQueueC @BeforeEach public void setUp() { - mailQueue = new MemoryMailQueueFactory.MemoryCacheableMailQueue(MailQueueName.of("test"), new RawMailQueueItemDecoratorFactory()); + mailQueue = new MemoryMailQueueFactory.MemoryCacheableMailQueue(MailQueueName.of("test"), new RawMailQueueItemDecoratorFactory(), Clock.systemUTC()); } @AfterEach --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org