This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit aa10262880e8081d1b8b2650f015c970fafd9409
Author: Benoit Tellier <btell...@linagora.com>
AuthorDate: Fri Apr 14 09:58:11 2023 +0700

    JAMES-3822 MemoryCacheableMailQueue should use a clock to manage delays
    
    This enables writing tests on the delay logic.
---
 .../james/queue/memory/MemoryMailQueueFactory.java | 38 ++++++++++++++--------
 .../queue/memory/MemoryCacheableMailQueueTest.java |  4 ++-
 2 files changed, 28 insertions(+), 14 deletions(-)

diff --git 
a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
 
b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
index 73bf35f1d7..b11aaab97a 100644
--- 
a/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
+++ 
b/server/queue/queue-memory/src/main/java/org/apache/james/queue/memory/MemoryMailQueueFactory.java
@@ -21,6 +21,7 @@ package org.apache.james.queue.memory;
 
 import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
+import java.time.Clock;
 import java.time.DateTimeException;
 import java.time.Duration;
 import java.time.Instant;
@@ -69,11 +70,17 @@ public class MemoryMailQueueFactory implements 
MailQueueFactory<MemoryMailQueueF
 
     private final ConcurrentHashMap<MailQueueName, MemoryCacheableMailQueue> 
mailQueues;
     private final MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory;
+    private final Clock clock;
 
     @Inject
-    public MemoryMailQueueFactory(MailQueueItemDecoratorFactory 
mailQueueItemDecoratorFactory) {
+    public MemoryMailQueueFactory(MailQueueItemDecoratorFactory 
mailQueueItemDecoratorFactory, Clock clock) {
         this.mailQueues = new ConcurrentHashMap<>();
         this.mailQueueItemDecoratorFactory = mailQueueItemDecoratorFactory;
+        this.clock = clock;
+    }
+
+    public MemoryMailQueueFactory(MailQueueItemDecoratorFactory 
mailQueueItemDecoratorFactory) {
+        this(mailQueueItemDecoratorFactory, Clock.systemUTC());
     }
 
     @PreDestroy
@@ -98,7 +105,7 @@ public class MemoryMailQueueFactory implements 
MailQueueFactory<MemoryMailQueueF
 
     @Override
     public MemoryCacheableMailQueue createQueue(MailQueueName name, 
PrefetchCount prefetchCount) {
-        MemoryCacheableMailQueue queue = mailQueues.computeIfAbsent(name, 
mailQueueName -> new MemoryCacheableMailQueue(mailQueueName, 
mailQueueItemDecoratorFactory));
+        MemoryCacheableMailQueue queue = mailQueues.computeIfAbsent(name, 
mailQueueName -> new MemoryCacheableMailQueue(mailQueueName, 
mailQueueItemDecoratorFactory, clock));
         queue.reference();
         return queue;
     }
@@ -110,8 +117,10 @@ public class MemoryMailQueueFactory implements 
MailQueueFactory<MemoryMailQueueF
         private final MailQueueName name;
         private final Flux<MailQueueItem> flux;
         private final Scheduler scheduler;
+        private final Clock clock;
 
-        public MemoryCacheableMailQueue(MailQueueName name, 
MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory) {
+        public MemoryCacheableMailQueue(MailQueueName name, 
MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, Clock clock) {
+            this.clock = clock;
             this.mailItems = new DelayQueue<>();
             this.inProcessingMailItems = new LinkedBlockingDeque<>();
             this.name = name;
@@ -120,6 +129,7 @@ public class MemoryMailQueueFactory implements 
MailQueueFactory<MemoryMailQueueF
                 try {
                     sink.success(mailItems.take());
                 } catch (InterruptedException e) {
+                    sink.error(e);
                     Thread.currentThread().interrupt();
                 }
             })
@@ -155,7 +165,7 @@ public class MemoryMailQueueFactory implements 
MailQueueFactory<MemoryMailQueueF
         public void enQueue(Mail mail, Duration delay) throws 
MailQueueException {
             ZonedDateTime nextDelivery = calculateNextDelivery(delay);
             try {
-                mailItems.put(new MemoryMailQueueItem(cloneMail(mail), this, 
nextDelivery));
+                mailItems.put(new MemoryMailQueueItem(cloneMail(mail), this, 
clock, nextDelivery));
             } catch (MessagingException e) {
                 throw new MailQueueException("Error while copying mail " + 
mail.getName(), e);
             }
@@ -169,13 +179,13 @@ public class MemoryMailQueueFactory implements 
MailQueueFactory<MemoryMailQueueF
         private ZonedDateTime calculateNextDelivery(Duration delay) {
             if (!delay.isNegative()) {
                 try {
-                    return ZonedDateTime.now().plus(delay);
+                    return ZonedDateTime.now(clock).plus(delay);
                 } catch (DateTimeException | ArithmeticException e) {
                     return 
Instant.ofEpochMilli(Long.MAX_VALUE).atZone(ZoneId.of("UTC"));
                 }
             }
 
-            return ZonedDateTime.now();
+            return ZonedDateTime.now(clock);
         }
 
         @Override
@@ -198,7 +208,7 @@ public class MemoryMailQueueFactory implements 
MailQueueFactory<MemoryMailQueueF
             return flux;
         }
 
-        public Mail getLastMail() throws MailQueueException, 
InterruptedException {
+        public Mail getLastMail() {
             MemoryMailQueueItem maybeItem = Iterables.getLast(mailItems, null);
             if (maybeItem == null) {
                 return null;
@@ -207,7 +217,7 @@ public class MemoryMailQueueFactory implements 
MailQueueFactory<MemoryMailQueueF
         }
 
         @Override
-        public long getSize() throws MailQueueException {
+        public long getSize() {
             return mailItems.size() + inProcessingMailItems.size();
         }
 
@@ -224,14 +234,14 @@ public class MemoryMailQueueFactory implements 
MailQueueFactory<MemoryMailQueueF
         }
 
         @Override
-        public long clear() throws MailQueueException {
+        public long clear() {
             int size = mailItems.size();
             mailItems.clear();
             return size;
         }
 
         @Override
-        public long remove(Type type, String value) throws MailQueueException {
+        public long remove(Type type, String value) {
             ImmutableList<MemoryMailQueueItem> toBeRemoved = mailItems.stream()
                 .filter(item -> shouldRemove(item, type, value))
                 .collect(ImmutableList.toImmutableList());
@@ -261,7 +271,7 @@ public class MemoryMailQueueFactory implements 
MailQueueFactory<MemoryMailQueueF
         }
 
         @Override
-        public MailQueueIterator browse() throws MailQueueException {
+        public MailQueueIterator browse() {
             Iterator<DefaultMailQueueItemView> underlying = 
ImmutableList.copyOf(mailItems)
                 .stream()
                 .map(item -> new DefaultMailQueueItemView(item.getMail(), 
item.delivery))
@@ -305,11 +315,13 @@ public class MemoryMailQueueFactory implements 
MailQueueFactory<MemoryMailQueueF
     public static class MemoryMailQueueItem implements 
MailQueue.MailQueueItem, Delayed {
         private final Mail mail;
         private final MemoryCacheableMailQueue queue;
+        private final Clock clock;
         private final ZonedDateTime delivery;
 
-        public MemoryMailQueueItem(Mail mail, MemoryCacheableMailQueue queue, 
ZonedDateTime delivery) {
+        public MemoryMailQueueItem(Mail mail, MemoryCacheableMailQueue queue, 
Clock clock, ZonedDateTime delivery) {
             this.mail = mail;
             this.queue = queue;
+            this.clock = clock;
             this.delivery = delivery;
         }
 
@@ -329,7 +341,7 @@ public class MemoryMailQueueFactory implements 
MailQueueFactory<MemoryMailQueueF
         @Override
         public long getDelay(TimeUnit unit) {
             try {
-                return ZonedDateTime.now().until(delivery, 
Temporals.chronoUnit(unit));
+                return ZonedDateTime.now(clock).until(delivery, 
Temporals.chronoUnit(unit));
             } catch (ArithmeticException e) {
                 return Long.MAX_VALUE;
             }
diff --git 
a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java
 
b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java
index a2fb680ab6..218d28d954 100644
--- 
a/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java
+++ 
b/server/queue/queue-memory/src/test/java/org/apache/james/queue/memory/MemoryCacheableMailQueueTest.java
@@ -22,6 +22,8 @@ package org.apache.james.queue.memory;
 import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.time.Clock;
+
 import org.apache.james.queue.api.DelayedManageableMailQueueContract;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueName;
@@ -37,7 +39,7 @@ public class MemoryCacheableMailQueueTest implements 
DelayedManageableMailQueueC
 
     @BeforeEach
     public void setUp() {
-        mailQueue = new 
MemoryMailQueueFactory.MemoryCacheableMailQueue(MailQueueName.of("test"), new 
RawMailQueueItemDecoratorFactory());
+        mailQueue = new 
MemoryMailQueueFactory.MemoryCacheableMailQueue(MailQueueName.of("test"), new 
RawMailQueueItemDecoratorFactory(), Clock.systemUTC());
     }
 
     @AfterEach


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to