This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a046b282c439da965f8582f199cca11968ba7bc5 Author: Rene Cordier <rcord...@linagora.com> AuthorDate: Wed Feb 28 15:04:00 2024 +0700 JAMES-4027 Make all queues on Rabbitmq quorum queue when quorum option is enabled --- .../main/java/org/apache/james/backends/rabbitmq/Constants.java | 1 - .../org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java | 4 ++-- .../src/main/java/org/apache/james/events/EventDispatcher.java | 4 ++-- .../src/main/java/org/apache/james/events/GroupRegistration.java | 3 +-- .../java/org/apache/james/events/GroupRegistrationHandler.java | 3 +-- .../main/java/org/apache/james/events/KeyReconnectionHandler.java | 3 +-- .../main/java/org/apache/james/events/KeyRegistrationHandler.java | 3 +-- .../mailbox/DistributedDeletedMessageVaultDeletionCallback.java | 5 ++--- .../org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java | 7 +++---- .../eventsourcing/distributed/RabbitMQTerminationSubscriber.java | 2 +- .../james/task/eventsourcing/distributed/RabbitMQWorkQueue.java | 5 ++--- 11 files changed, 16 insertions(+), 24 deletions(-) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java index a63809f7aa..d56b8846b3 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/Constants.java @@ -26,7 +26,6 @@ import com.rabbitmq.client.AMQP; public interface Constants { boolean DURABLE = true; boolean AUTO_DELETE = true; - boolean ALLOW_QUORUM = true; boolean EXCLUSIVE = true; boolean NO_LOCAL = true; diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java index 092804891e..2cd50d1f23 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQConfiguration.java @@ -801,9 +801,9 @@ public class RabbitMQConfiguration { return sslConfiguration; } - public QueueArguments.Builder workQueueArgumentsBuilder(boolean allowQuorum) { + public QueueArguments.Builder workQueueArgumentsBuilder() { QueueArguments.Builder builder = QueueArguments.builder(); - if (allowQuorum && useQuorumQueues) { + if (useQuorumQueues) { builder.quorumQueue().replicationFactor(quorumQueueReplicationFactor); quorumQueueDeliveryLimit.ifPresent(builder::deliveryLimit); } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java index 99ffad1d77..fb7dc8a5d7 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/EventDispatcher.java @@ -25,7 +25,6 @@ import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE; import static org.apache.james.backends.rabbitmq.Constants.DURABLE; import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY; import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; -import static org.apache.james.backends.rabbitmq.QueueArguments.NO_ARGUMENTS; import static org.apache.james.events.RabbitMQEventBus.EVENT_BUS_ID; import java.time.Duration; @@ -101,7 +100,8 @@ public class EventDispatcher { .durable(DURABLE) .exclusive(!EXCLUSIVE) .autoDelete(!AUTO_DELETE) - .arguments(NO_ARGUMENTS)), + .arguments(configuration.workQueueArgumentsBuilder() + .build())), sender.bind(BindingSpecification.binding() .exchange(namingStrategy.deadLetterExchange()) .queue(namingStrategy.deadLetterQueue().getName()) diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java index f440d72745..15c561a18e 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java @@ -19,7 +19,6 @@ package org.apache.james.events; -import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM; import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; import static org.apache.james.backends.rabbitmq.Constants.DURABLE; import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; @@ -133,7 +132,7 @@ class GroupRegistration implements Registration { .durable(DURABLE) .exclusive(!EXCLUSIVE) .autoDelete(!AUTO_DELETE) - .arguments(configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM) + .arguments(configuration.workQueueArgumentsBuilder() .deadLetter(namingStrategy.deadLetterExchange()) .build())); } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java index 63818517d2..44d19b88f1 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java @@ -19,7 +19,6 @@ package org.apache.james.events; -import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM; import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; import static org.apache.james.backends.rabbitmq.Constants.DURABLE; import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY; @@ -109,7 +108,7 @@ class GroupRegistrationHandler { .durable(DURABLE) .exclusive(!EXCLUSIVE) .autoDelete(!AUTO_DELETE) - .arguments(configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM) + .arguments(configuration.workQueueArgumentsBuilder() .deadLetter(namingStrategy.deadLetterExchange()) .build()), BindingSpecification.binding() diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java index 93b98d3bc7..2a09e77bb8 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/KeyReconnectionHandler.java @@ -19,7 +19,6 @@ package org.apache.james.events; -import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM; import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; import static org.apache.james.backends.rabbitmq.Constants.DURABLE; import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; @@ -56,7 +55,7 @@ public class KeyReconnectionHandler implements SimpleConnectionPool.Reconnection public Publisher<Void> handleReconnection(Connection connection) { return Mono.fromRunnable(() -> { try (Channel channel = connection.createChannel()) { - QueueArguments.Builder builder = configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM); + QueueArguments.Builder builder = configuration.workQueueArgumentsBuilder(); configuration.getQueueTTL().ifPresent(builder::queueTTL); channel.queueDeclare(namingStrategy.queueName(eventBusId).asString(), DURABLE, !EXCLUSIVE, AUTO_DELETE, builder.build()); } catch (Exception e) { diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java index 8b32e11db1..8f31c2f0c5 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java @@ -19,7 +19,6 @@ package org.apache.james.events; -import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM; import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; import static org.apache.james.events.RabbitMQEventBus.EVENT_BUS_ID; @@ -123,7 +122,7 @@ class KeyRegistrationHandler { } private void declareQueue(Sender sender) { - QueueArguments.Builder builder = configuration.workQueueArgumentsBuilder(!ALLOW_QUORUM); + QueueArguments.Builder builder = configuration.workQueueArgumentsBuilder(); configuration.getQueueTTL().ifPresent(builder::queueTTL); sender.declareQueue( QueueSpecification.queue(registrationQueue.asString()) diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java index 14ecde514b..e0a7a127aa 100644 --- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java +++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java @@ -20,7 +20,6 @@ package org.apache.james.modules.mailbox; import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN; -import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM; import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE; import static org.apache.james.backends.rabbitmq.Constants.DURABLE; @@ -198,14 +197,14 @@ public class DistributedDeletedMessageVaultDeletionCallback implements DeleteMes .durable(DURABLE) .exclusive(!EXCLUSIVE) .autoDelete(!AUTO_DELETE) - .arguments(rabbitMQConfiguration.workQueueArgumentsBuilder(!ALLOW_QUORUM) + .arguments(rabbitMQConfiguration.workQueueArgumentsBuilder() .deadLetter(DEAD_LETTER) .build())), sender.declareQueue(QueueSpecification.queue(QUEUE) .durable(DURABLE) .exclusive(!EXCLUSIVE) .autoDelete(!AUTO_DELETE) - .arguments(rabbitMQConfiguration.workQueueArgumentsBuilder(!ALLOW_QUORUM) + .arguments(rabbitMQConfiguration.workQueueArgumentsBuilder() .deadLetter(DEAD_LETTER) .build())), sender.bind(BindingSpecification.binding() diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java index 7cfe554f8b..d55b0296ee 100644 --- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java +++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java @@ -19,12 +19,10 @@ package org.apache.james.queue.rabbitmq; -import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM; import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; import static org.apache.james.backends.rabbitmq.Constants.DURABLE; import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY; import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; -import static org.apache.james.backends.rabbitmq.QueueArguments.NO_ARGUMENTS; import static org.apache.james.queue.api.MailQueue.QUEUE_SIZE_METRIC_NAME_PREFIX; import java.time.Clock; @@ -168,14 +166,15 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu .durable(DURABLE) .exclusive(!EXCLUSIVE) .autoDelete(!AUTO_DELETE) - .arguments(configuration.workQueueArgumentsBuilder(ALLOW_QUORUM) + .arguments(configuration.workQueueArgumentsBuilder() .deadLetter(mailQueueName.toDeadLetterExchangeName()) .build())), sender.declareQueue(QueueSpecification.queue(mailQueueName.toDeadLetterQueueName()) .durable(DURABLE) .exclusive(!EXCLUSIVE) .autoDelete(!AUTO_DELETE) - .arguments(NO_ARGUMENTS)), + .arguments(configuration.workQueueArgumentsBuilder() + .build())), sender.bind(BindingSpecification.binding() .exchange(mailQueueName.toRabbitExchangeName().asString()) .queue(mailQueueName.toWorkQueueName().asString()) diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java index 58a68517da..e7232a7b5e 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQTerminationSubscriber.java @@ -84,7 +84,7 @@ public class RabbitMQTerminationSubscriber implements TerminationSubscriber, Sta public void start() { sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE_NAME)).block(); - QueueArguments.Builder builder = QueueArguments.builder(); + QueueArguments.Builder builder = rabbitMQConfiguration.workQueueArgumentsBuilder(); rabbitMQConfiguration.getQueueTTL().ifPresent(builder::queueTTL); sender.declare(QueueSpecification.queue(queueName.asString()).durable(!DURABLE).autoDelete(!AUTO_DELETE).arguments(builder.build())).block(); sender.bind(BindingSpecification.binding(EXCHANGE_NAME, ROUTING_KEY, queueName.asString())).block(); diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java index af78c25c31..acdcfbc692 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/RabbitMQWorkQueue.java @@ -20,7 +20,6 @@ package org.apache.james.task.eventsourcing.distributed; import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN; -import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM; import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; import static org.apache.james.backends.rabbitmq.Constants.DURABLE; import static org.apache.james.backends.rabbitmq.Constants.REQUEUE; @@ -126,7 +125,7 @@ public class RabbitMQWorkQueue implements WorkQueue { Mono<AMQP.Queue.DeclareOk> declareQueue = sender .declare(QueueSpecification.queue(QUEUE_NAME) .durable(true) - .arguments(rabbitMQConfiguration.workQueueArgumentsBuilder(ALLOW_QUORUM) + .arguments(rabbitMQConfiguration.workQueueArgumentsBuilder() .singleActiveConsumer() .consumerTimeout(rabbitMQConfiguration.getTaskQueueConsumerTimeout().toMillis()) .build())) @@ -197,7 +196,7 @@ public class RabbitMQWorkQueue implements WorkQueue { private void listenToCancelRequests() { sender.declareExchange(ExchangeSpecification.exchange(CANCEL_REQUESTS_EXCHANGE_NAME)).block(); - QueueArguments.Builder builder = QueueArguments.builder(); + QueueArguments.Builder builder = rabbitMQConfiguration.workQueueArgumentsBuilder(); rabbitMQConfiguration.getQueueTTL().ifPresent(builder::queueTTL); QueueSpecification specification = QueueSpecification.queue(cancelRequestQueueName.asString()) .durable(!DURABLE) --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org For additional commands, e-mail: notifications-h...@james.apache.org