This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 8cec9479c7c60fae3957975b3c4fc34e3390fde7 Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Fri Nov 10 09:13:53 2023 +0100 JAMES-3955 Applicative timeouts before Rabbit timeouts --- .../src/main/java/org/apache/james/events/ListenerExecutor.java | 9 ++++++++- .../src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java | 6 +++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java b/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java index 39cae87d07..2eb14cc97a 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/ListenerExecutor.java @@ -21,6 +21,8 @@ package org.apache.james.events; import static org.apache.james.events.EventBus.Metrics.timerName; +import java.time.Duration; + import org.apache.james.metrics.api.MetricFactory; import org.apache.james.util.MDCBuilder; import org.apache.james.util.ReactorUtils; @@ -28,6 +30,10 @@ import org.apache.james.util.ReactorUtils; import reactor.core.publisher.Mono; class ListenerExecutor { + // JAMES-3955 RabbitMQ handles timeout by closing channels thus + // causing event consumption to halt. We thus need to handle timeout beforehand. + private static final Duration TIMEOUT = Duration.ofMinutes(10); + private final MetricFactory metricFactory; ListenerExecutor(MetricFactory metricFactory) { @@ -38,7 +44,8 @@ class ListenerExecutor { if (listener.isHandling(event)) { return Mono.from(metricFactory.decoratePublisherWithTimerMetric(timerName(listener), Mono.from(listener.reactiveEvent(event)) - .contextWrite(ReactorUtils.context("ListenerExecutor", mdc(listener, mdcBuilder, event))))); + .contextWrite(ReactorUtils.context("ListenerExecutor", mdc(listener, mdcBuilder, event))) + .timeout(TIMEOUT))); } return Mono.empty(); } diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java index 98ed284354..b5eee6ba93 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java @@ -21,6 +21,7 @@ package org.apache.james.queue.rabbitmq; import static org.apache.james.queue.api.MailQueue.DEQUEUED_METRIC_NAME_PREFIX; +import java.time.Duration; import java.util.function.Consumer; import org.apache.james.backends.rabbitmq.ReceiverProvider; @@ -50,6 +51,9 @@ import reactor.rabbitmq.Receiver; class Dequeuer { private static final Logger LOGGER = LoggerFactory.getLogger(Dequeuer.class); private static final boolean REQUEUE = true; + // JAMES-3955 RabbitMQ handles timeout by closing channels thus + // causing event consumption to halt. We thus need to handle timeout beforehand. + private static final Duration TIMEOUT = Duration.ofMinutes(15); private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem { @@ -133,7 +137,7 @@ class Dequeuer { LOGGER.error("Failed to load email, requeue corresponding message", e); response.nack(REQUEUE); return Mono.empty(); - }); + }).timeout(TIMEOUT); } private ThrowingConsumer<MailQueue.MailQueueItem.CompletionStatus> ack(AcknowledgableDelivery response, MailWithEnqueueId mailWithEnqueueId) { --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org