This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch 3.8.x
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/3.8.x by this push:
     new ec09ee7741 JAMES-3604 Enable to use quorum queues onto DTM work queue 
(#1714)
ec09ee7741 is described below

commit ec09ee7741282e5777b6fdf0a5a988ce9e0aa1be
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Wed Sep 6 04:15:31 2023 +0700

    JAMES-3604 Enable to use quorum queues onto DTM work queue (#1714)
---
 .../main/java/org/apache/james/backends/rabbitmq/Constants.java   | 5 +----
 .../java/org/apache/james/backends/rabbitmq/QueueArguments.java   | 7 +++++++
 .../james/task/eventsourcing/distributed/RabbitMQWorkQueue.java   | 8 +++++++-
 3 files changed, 15 insertions(+), 5 deletions(-)

diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java
index 3cf7049b76..a63809f7aa 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java
@@ -21,7 +21,6 @@ package org.apache.james.backends.rabbitmq;
 
 import java.util.Map;
 
-import com.google.common.collect.ImmutableMap;
 import com.rabbitmq.client.AMQP;
 
 public interface Constants {
@@ -37,9 +36,7 @@ public interface Constants {
     String EMPTY_ROUTING_KEY = "";
     boolean REQUEUE = true;
 
-    String SINGLE_ACTIVE_CONSUMER_ARGUMENT = "x-single-active-consumer";
-    boolean SINGLE_ACTIVE_CONSUMER = true;
-    Map<String, Object> WITH_SINGLE_ACTIVE_CONSUMER = 
ImmutableMap.of(Constants.SINGLE_ACTIVE_CONSUMER_ARGUMENT, 
Constants.SINGLE_ACTIVE_CONSUMER);
+    Map<String, Object> WITH_SINGLE_ACTIVE_CONSUMER = 
QueueArguments.builder().singleActiveConsumer().build();
 
     String DIRECT_EXCHANGE = "direct";
 
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/QueueArguments.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/QueueArguments.java
index 2dd1c7ca45..8a1cb2d47a 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/QueueArguments.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/QueueArguments.java
@@ -3,6 +3,8 @@ package org.apache.james.backends.rabbitmq;
 import com.google.common.collect.ImmutableMap;
 
 public class QueueArguments {
+    private static final String SINGLE_ACTIVE_CONSUMER_ARGUMENT = 
"x-single-active-consumer";
+
     public static class Builder {
         @FunctionalInterface
         public interface RequiresReplicationFactor {
@@ -30,6 +32,11 @@ public class QueueArguments {
             return this;
         }
 
+        public Builder singleActiveConsumer() {
+            arguments.put(SINGLE_ACTIVE_CONSUMER_ARGUMENT, true);
+            return this;
+        }
+
         public Builder queueTTL(long queueTTL) {
             arguments.put("x-expires", queueTTL);
             return this;
diff --git 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index b315e9bec5..5b2435d825 100644
--- 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -21,6 +21,7 @@
 package org.apache.james.task.eventsourcing.distributed;
 
 import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
+import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM;
 import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
@@ -125,7 +126,12 @@ public class RabbitMQWorkQueue implements WorkQueue {
             .declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME))
             .retryWhen(Retry.backoff(NUM_RETRIES, FIRST_BACKOFF));
         Mono<AMQP.Queue.DeclareOk> declareQueue = sender
-            
.declare(QueueSpecification.queue(QUEUE_NAME).durable(true).arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER))
+            .declare(QueueSpecification.queue(QUEUE_NAME)
+                .durable(true)
+                
.arguments(rabbitMQConfiguration.workQueueArgumentsBuilder(ALLOW_QUORUM)
+                    .singleActiveConsumer()
+                    .build())
+                .arguments(Constants.WITH_SINGLE_ACTIVE_CONSUMER))
             .retryWhen(Retry.backoff(NUM_RETRIES, FIRST_BACKOFF));
         Mono<AMQP.Queue.BindOk> bindQueueToExchange = sender
             .bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, 
QUEUE_NAME))


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to