This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 8cec9479c7c60fae3957975b3c4fc34e3390fde7
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Fri Nov 10 09:13:53 2023 +0100

    JAMES-3955 Applicative timeouts before Rabbit timeouts
---
 .../src/main/java/org/apache/james/events/ListenerExecutor.java  | 9 ++++++++-
 .../src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java  | 6 +++++-
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java
index 39cae87d07..2eb14cc97a 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java
@@ -21,6 +21,8 @@ package org.apache.james.events;
 
 import static org.apache.james.events.EventBus.Metrics.timerName;
 
+import java.time.Duration;
+
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.util.MDCBuilder;
 import org.apache.james.util.ReactorUtils;
@@ -28,6 +30,10 @@ import org.apache.james.util.ReactorUtils;
 import reactor.core.publisher.Mono;
 
 class ListenerExecutor {
+    // JAMES-3955 RabbitMQ handles timeout by closing channels thus
+    // causing event consumption to halt. We thus need to handle timeout 
beforehand.
+    private static final Duration TIMEOUT = Duration.ofMinutes(10);
+
     private final MetricFactory metricFactory;
 
     ListenerExecutor(MetricFactory metricFactory) {
@@ -38,7 +44,8 @@ class ListenerExecutor {
         if (listener.isHandling(event)) {
             return 
Mono.from(metricFactory.decoratePublisherWithTimerMetric(timerName(listener),
                 Mono.from(listener.reactiveEvent(event))
-                    .contextWrite(ReactorUtils.context("ListenerExecutor", 
mdc(listener, mdcBuilder, event)))));
+                    .contextWrite(ReactorUtils.context("ListenerExecutor", 
mdc(listener, mdcBuilder, event)))
+                    .timeout(TIMEOUT)));
         }
         return Mono.empty();
     }
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 98ed284354..b5eee6ba93 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -21,6 +21,7 @@ package org.apache.james.queue.rabbitmq;
 
 import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX;
 
+import java.time.Duration;
 import java.util.function.Consumer;
 
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
@@ -50,6 +51,9 @@ import reactor.rabbitmq.Receiver;
 class Dequeuer {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(Dequeuer.class);
     private static final boolean REQUEUE = true;
+    // JAMES-3955 RabbitMQ handles timeout by closing channels thus
+    // causing event consumption to halt. We thus need to handle timeout 
beforehand.
+    private static final Duration TIMEOUT = Duration.ofMinutes(15);
 
     private static class RabbitMQMailQueueItem implements 
MailQueue.MailQueueItem {
 
@@ -133,7 +137,7 @@ class Dequeuer {
                 LOGGER.error("Failed to load email, requeue corresponding 
message", e);
                 response.nack(REQUEUE);
                 return Mono.empty();
-            });
+            }).timeout(TIMEOUT);
     }
 
     private ThrowingConsumer<MailQueue.MailQueueItem.CompletionStatus> 
ack(AcknowledgableDelivery response, MailWithEnqueueId mailWithEnqueueId) {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to