This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch 3.8.x in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/3.8.x by this push: new ec09ee7741 JAMES-3604 Enable to use quorum queues onto DTM work queue (#1714) ec09ee7741 is described below commit ec09ee7741282e5777b6fdf0a5a988ce9e0aa1be Author: Benoit TELLIER <btell...@linagora.com> AuthorDate: Wed Sep 6 04:15:31 2023 +0700 JAMES-3604 Enable to use quorum queues onto DTM work queue (#1714) --- .../main/java/org/apache/james/backends/rabbitmq/Constants.java | 5 +---- .../java/org/apache/james/backends/rabbitmq/QueueArguments.java | 7 +++++++ .../james/task/eventsourcing/distributed/RabbitMQWorkQueue.java | 8 +++++++- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java index 3cf7049b76..a63809f7aa 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java @@ -21,7 +21,6 @@ package org.apache.james.backends.rabbitmq; import java.util.Map; -import com.google.common.collect.ImmutableMap; import com.rabbitmq.client.AMQP; public interface Constants { @@ -37,9 +36,7 @@ public interface Constants { String EMPTY_ROUTING_KEY = ""; boolean REQUEUE = true; - String SINGLE_ACTIVE_CONSUMER_ARGUMENT = "x-single-active-consumer"; - boolean SINGLE_ACTIVE_CONSUMER = true; - Map<String, Object> WITH_SINGLE_ACTIVE_CONSUMER = ImmutableMap.of(Constants.SINGLE_ACTIVE_CONSUMER_ARGUMENT, Constants.SINGLE_ACTIVE_CONSUMER); + Map<String, Object> WITH_SINGLE_ACTIVE_CONSUMER = QueueArguments.builder().singleActiveConsumer().build(); String DIRECT_EXCHANGE = "direct"; diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/QueueArguments.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/QueueArguments.java index 2dd1c7ca45..8a1cb2d47a 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/QueueArguments.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/QueueArguments.java @@ -3,6 +3,8 @@ package org.apache.james.backends.rabbitmq; import com.google.common.collect.ImmutableMap; public class QueueArguments { + private static final String SINGLE_ACTIVE_CONSUMER_ARGUMENT = "x-single-active-consumer"; + public static class Builder { @FunctionalInterface public interface RequiresReplicationFactor { @@ -30,6 +32,11 @@ public class QueueArguments { return this; } + public Builder singleActiveConsumer() { + arguments.put(SINGLE_ACTIVE_CONSUMER_ARGUMENT, true); + return this; + } + public Builder queueTTL(long queueTTL) { arguments.put("x-expires", queueTTL); return this; diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java index b315e9bec5..5b2435d825 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java @@ -21,6 +21,7 @@ package org.apache.james.task.eventsourcing.distributed; import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN; +import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM; import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; import static org.apache.james.backends.rabbitmq.Constants.DURABLE; import static org.apache.james.backends.rabbitmq.Constants.REQUEUE; @@ -125,7 +126,12 @@ public class RabbitMQWorkQueue implements WorkQueue { .declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)) .retryWhen(Retry.backoff(NUM_RETRIES, FIRST_BACKOFF)); Mono<AMQP.Queue.DeclareOk> declareQueue = sender - .declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER)) + .declare(QueueSpecification.queue(QUEUE_NAME) + .durable(true) + .arguments(rabbitMQConfiguration.workQueueArgumentsBuilder(ALLOW_QUORUM) + .singleActiveConsumer() + .build()) + .arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER)) .retryWhen(Retry.backoff(NUM_RETRIES, FIRST_BACKOFF)); Mono<AMQP.Queue.BindOk> bindQueueToExchange = sender .bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, QUEUE_NAME)) --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org