This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a046b282c439da965f8582f199cca11968ba7bc5
Author: Rene Cordier <rcord...@linagora.com>
AuthorDate: Wed Feb 28 15:04:00 2024 +0700

    JAMES-4027 Make all queues on Rabbitmq quorum queue when quorum option is 
enabled
---
 .../main/java/org/apache/james/backends/rabbitmq/Constants.java    | 1 -
 .../org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java  | 4 ++--
 .../src/main/java/org/apache/james/events/EventDispatcher.java     | 4 ++--
 .../src/main/java/org/apache/james/events/GroupRegistration.java   | 3 +--
 .../java/org/apache/james/events/GroupRegistrationHandler.java     | 3 +--
 .../main/java/org/apache/james/events/KeyReconnectionHandler.java  | 3 +--
 .../main/java/org/apache/james/events/KeyRegistrationHandler.java  | 3 +--
 .../mailbox/DistributedDeletedMessageVaultDeletionCallback.java    | 5 ++---
 .../org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java  | 7 +++----
 .../eventsourcing/distributed/RabbitMQTerminationSubscriber.java   | 2 +-
 .../james/task/eventsourcing/distributed/RabbitMQWorkQueue.java    | 5 ++---
 11 files changed, 16 insertions(+), 24 deletions(-)

diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java
index a63809f7aa..d56b8846b3 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java
@@ -26,7 +26,6 @@ import com.rabbitmq.client.AMQP;
 public interface Constants {
     boolean DURABLE = true;
     boolean AUTO_DELETE = true;
-    boolean ALLOW_QUORUM = true;
     boolean EXCLUSIVE = true;
     boolean NO_LOCAL = true;
 
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
index 092804891e..2cd50d1f23 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java
@@ -801,9 +801,9 @@ public class RabbitMQConfiguration {
         return sslConfiguration;
     }
 
-    public QueueArguments.Builder workQueueArgumentsBuilder(boolean 
allowQuorum) {
+    public QueueArguments.Builder workQueueArgumentsBuilder() {
         QueueArguments.Builder builder = QueueArguments.builder();
-        if (allowQuorum && useQuorumQueues) {
+        if (useQuorumQueues) {
             
builder.quorumQueue().replicationFactor(quorumQueueReplicationFactor);
             quorumQueueDeliveryLimit.ifPresent(builder::deliveryLimit);
         }
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
index 99ffad1d77..fb7dc8a5d7 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java
@@ -25,7 +25,6 @@ import static 
org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
 import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
 import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
-import static org.apache.james.backends.rabbitmq.QueueArguments.NO_ARGUMENTS;
 import static org.apache.james.events.RabbitMQEventBus.EVENT_BUS_ID;
 
 import java.time.Duration;
@@ -101,7 +100,8 @@ public class EventDispatcher {
                 .durable(DURABLE)
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(!AUTO_DELETE)
-                .arguments(NO_ARGUMENTS)),
+                .arguments(configuration.workQueueArgumentsBuilder()
+                    .build())),
             sender.bind(BindingSpecification.binding()
                 .exchange(namingStrategy.deadLetterExchange())
                 .queue(namingStrategy.deadLetterQueue().getName())
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
index f440d72745..15c561a18e 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java
@@ -19,7 +19,6 @@
 
 package org.apache.james.events;
 
-import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM;
 import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
@@ -133,7 +132,7 @@ class GroupRegistration implements Registration {
                 .durable(DURABLE)
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(!AUTO_DELETE)
-                
.arguments(configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM)
+                .arguments(configuration.workQueueArgumentsBuilder()
                     .deadLetter(namingStrategy.deadLetterExchange())
                     .build()));
     }
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
index 63818517d2..44d19b88f1 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java
@@ -19,7 +19,6 @@
 
 package org.apache.james.events;
 
-import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM;
 import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
@@ -109,7 +108,7 @@ class GroupRegistrationHandler {
                 .durable(DURABLE)
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(!AUTO_DELETE)
-                
.arguments(configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM)
+                .arguments(configuration.workQueueArgumentsBuilder()
                     .deadLetter(namingStrategy.deadLetterExchange())
                     .build()),
             BindingSpecification.binding()
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
index 93b98d3bc7..2a09e77bb8 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java
@@ -19,7 +19,6 @@
 
 package org.apache.james.events;
 
-import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM;
 import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
@@ -56,7 +55,7 @@ public class KeyReconnectionHandler implements 
SimpleConnectionPool.Reconnection
     public Publisher<Void> handleReconnection(Connection connection) {
         return Mono.fromRunnable(() -> {
             try (Channel channel = connection.createChannel()) {
-                QueueArguments.Builder builder = 
configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM);
+                QueueArguments.Builder builder = 
configuration.workQueueArgumentsBuilder();
                 configuration.getQueueTTL().ifPresent(builder::queueTTL);
                 
channel.queueDeclare(namingStrategy.queueName(eventBusId).asString(), DURABLE, 
!EXCLUSIVE, AUTO_DELETE, builder.build());
             } catch (Exception e) {
diff --git 
a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
 
b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
index 8b32e11db1..8f31c2f0c5 100644
--- 
a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
+++ 
b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java
@@ -19,7 +19,6 @@
 
 package org.apache.james.events;
 
-import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM;
 import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
 import static org.apache.james.events.RabbitMQEventBus.EVENT_BUS_ID;
@@ -123,7 +122,7 @@ class KeyRegistrationHandler {
     }
 
     private void declareQueue(Sender sender) {
-        QueueArguments.Builder builder = 
configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM);
+        QueueArguments.Builder builder = 
configuration.workQueueArgumentsBuilder();
         configuration.getQueueTTL().ifPresent(builder::queueTTL);
         sender.declareQueue(
             QueueSpecification.queue(registrationQueue.asString())
diff --git 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
index 14ecde514b..e0a7a127aa 100644
--- 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
+++ 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
@@ -20,7 +20,6 @@
 package org.apache.james.modules.mailbox;
 
 import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
-import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM;
 import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
 import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
@@ -198,14 +197,14 @@ public class 
DistributedDeletedMessageVaultDeletionCallback implements DeleteMes
                     .durable(DURABLE)
                     .exclusive(!EXCLUSIVE)
                     .autoDelete(!AUTO_DELETE)
-                    
.arguments(rabbitMQConfiguration.workQueueArgumentsBuilder(!ALLOW_QUORUM)
+                    
.arguments(rabbitMQConfiguration.workQueueArgumentsBuilder()
                         .deadLetter(DEAD_LETTER)
                         .build())),
                 sender.declareQueue(QueueSpecification.queue(QUEUE)
                     .durable(DURABLE)
                     .exclusive(!EXCLUSIVE)
                     .autoDelete(!AUTO_DELETE)
-                    
.arguments(rabbitMQConfiguration.workQueueArgumentsBuilder(!ALLOW_QUORUM)
+                    
.arguments(rabbitMQConfiguration.workQueueArgumentsBuilder()
                         .deadLetter(DEAD_LETTER)
                         .build())),
                 sender.bind(BindingSpecification.binding()
diff --git 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index 7cfe554f8b..d55b0296ee 100644
--- 
a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ 
b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -19,12 +19,10 @@
 
 package org.apache.james.queue.rabbitmq;
 
-import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM;
 import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
 import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
-import static org.apache.james.backends.rabbitmq.QueueArguments.NO_ARGUMENTS;
 import static 
org.apache.james.queue.api.MailQueue.QUEUE_SIZE_METRIC_NAME_PREFIX;
 
 import java.time.Clock;
@@ -168,14 +166,15 @@ public class RabbitMQMailQueueFactory implements 
MailQueueFactory<RabbitMQMailQu
                 .durable(DURABLE)
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(!AUTO_DELETE)
-                
.arguments(configuration.workQueueArgumentsBuilder(ALLOW_QUORUM)
+                .arguments(configuration.workQueueArgumentsBuilder()
                     .deadLetter(mailQueueName.toDeadLetterExchangeName())
                     .build())),
             
sender.declareQueue(QueueSpecification.queue(mailQueueName.toDeadLetterQueueName())
                 .durable(DURABLE)
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(!AUTO_DELETE)
-                .arguments(NO_ARGUMENTS)),
+                .arguments(configuration.workQueueArgumentsBuilder()
+                    .build())),
             sender.bind(BindingSpecification.binding()
                 .exchange(mailQueueName.toRabbitExchangeName().asString())
                 .queue(mailQueueName.toWorkQueueName().asString())
diff --git 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
index 58a68517da..e7232a7b5e 100644
--- 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
+++ 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java
@@ -84,7 +84,7 @@ public class RabbitMQTerminationSubscriber implements 
TerminationSubscriber, Sta
 
     public void start() {
         
sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block();
-        QueueArguments.Builder builder = QueueArguments.builder();
+        QueueArguments.Builder builder = 
rabbitMQConfiguration.workQueueArgumentsBuilder();
         rabbitMQConfiguration.getQueueTTL().ifPresent(builder::queueTTL);
         
sender.declare(QueueSpecification.queue(queueName.asString()).durable(!DURABLE).autoDelete(!AUTO_DELETE).arguments(builder.build())).block();
         sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, 
queueName.asString())).block();
diff --git 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
index af78c25c31..acdcfbc692 100644
--- 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
+++ 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java
@@ -20,7 +20,6 @@
 package org.apache.james.task.eventsourcing.distributed;
 
 import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
-import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM;
 import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
 import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
 import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
@@ -126,7 +125,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
         Mono<AMQP.Queue.DeclareOk> declareQueue = sender
             .declare(QueueSpecification.queue(QUEUE_NAME)
                 .durable(true)
-                
.arguments(rabbitMQConfiguration.workQueueArgumentsBuilder(ALLOW_QUORUM)
+                .arguments(rabbitMQConfiguration.workQueueArgumentsBuilder()
                     .singleActiveConsumer()
                     
.consumerTimeout(rabbitMQConfiguration.getTaskQueueConsumerTimeout().toMillis())
                     .build()))
@@ -197,7 +196,7 @@ public class RabbitMQWorkQueue implements WorkQueue {
 
     private void listenToCancelRequests() {
         
sender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block();
-        QueueArguments.Builder builder = QueueArguments.builder();
+        QueueArguments.Builder builder = 
rabbitMQConfiguration.workQueueArgumentsBuilder();
         rabbitMQConfiguration.getQueueTTL().ifPresent(builder::queueTTL);
         QueueSpecification specification = 
QueueSpecification.queue(cancelRequestQueueName.asString())
             .durable(!DURABLE)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to