This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5cb5093d3cf416de7f3dafa1048e66df51d93105 Author: Benoit TELLIER <[email protected]> AuthorDate: Wed Jan 7 15:20:29 2026 +0100 JAMES-4159 RabbitMQEventBus: Group configuration objects together This is already beneficial in terms of lines of code and will be even moe as we add new configuration objects --- .../apache/james/events/GroupConsumerRetry.java | 4 +-- .../org/apache/james/events/GroupRegistration.java | 26 ++++++++----------- .../james/events/GroupRegistrationHandler.java | 26 ++++++++----------- .../james/events/KeyRegistrationHandler.java | 29 +++++++++------------- .../org/apache/james/events/RabbitMQEventBus.java | 25 ++++++++++--------- .../org/apache/james/events/NetworkErrorTest.java | 9 +++---- ...RabbitMQEventBusDeadLetterQueueUpgradeTest.java | 5 ++-- .../apache/james/events/RabbitMQEventBusTest.java | 7 +++--- .../RabbitMQEventBusUsingQuorumQueueTest.java | 6 ++--- .../rabbitmq/host/RabbitMQEventBusHostSystem.java | 5 ++-- .../event/ContentDeletionEventBusModule.java | 4 +-- .../james/modules/event/JMAPEventBusModule.java | 4 +-- .../james/modules/event/MailboxEventBusModule.java | 4 +-- 13 files changed, 68 insertions(+), 86 deletions(-) diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupConsumerRetry.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupConsumerRetry.java index ff37624277..bcdedced35 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupConsumerRetry.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupConsumerRetry.java @@ -64,12 +64,10 @@ class GroupConsumerRetry { private final EventDeadLetters eventDeadLetters; private final Group group; private final EventSerializer eventSerializer; - private final RabbitMQConfiguration rabbitMQConfiguration; GroupConsumerRetry(NamingStrategy namingStrategy, Sender sender, Group group, RetryBackoffConfiguration retryBackoff, - EventDeadLetters eventDeadLetters, EventSerializer eventSerializer, RabbitMQConfiguration rabbitMQConfiguration) { + EventDeadLetters eventDeadLetters, EventSerializer eventSerializer) { this.sender = sender; - this.rabbitMQConfiguration = rabbitMQConfiguration; this.retryExchangeName = namingStrategy.retryExchange(group); this.retryBackoff = retryBackoff; this.eventDeadLetters = eventDeadLetters; diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java index fe7a1e317d..dc8be9a9ae 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistration.java @@ -32,7 +32,6 @@ import java.util.Objects; import java.util.Optional; import java.util.function.Predicate; -import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.backends.rabbitmq.ReceiverProvider; import org.apache.james.util.MDCBuilder; @@ -83,30 +82,27 @@ class GroupRegistration implements Registration { private final GroupConsumerRetry retryHandler; private final WaitDelayGenerator delayGenerator; private final Group group; - private final RetryBackoffConfiguration retryBackoff; private final ListenerExecutor listenerExecutor; - private final RabbitMQConfiguration configuration; + private final RabbitMQEventBus.Configurations configurations; private Optional<Disposable> receiverSubscriber; private final ReceiverProvider receiverProvider; private Scheduler scheduler; GroupRegistration(NamingStrategy namingStrategy, ReactorRabbitMQChannelPool channelPool, Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer, - EventListener.ReactiveEventListener listener, Group group, RetryBackoffConfiguration retryBackoff, - EventDeadLetters eventDeadLetters, - Runnable unregisterGroup, ListenerExecutor listenerExecutor, RabbitMQConfiguration configuration) { + EventListener.ReactiveEventListener listener, Group group, EventDeadLetters eventDeadLetters, Runnable unregisterGroup, + ListenerExecutor listenerExecutor, RabbitMQEventBus.Configurations configurations) { this.namingStrategy = namingStrategy; this.channelPool = channelPool; this.eventSerializer = eventSerializer; this.listener = listener; - this.configuration = configuration; + this.configurations = configurations; this.queueName = namingStrategy.workQueue(group); this.receiverProvider = receiverProvider; - this.retryBackoff = retryBackoff; this.listenerExecutor = listenerExecutor; this.receiverSubscriber = Optional.empty(); this.unregisterGroup = unregisterGroup; - this.retryHandler = new GroupConsumerRetry(namingStrategy, sender, group, retryBackoff, eventDeadLetters, eventSerializer, configuration); - this.delayGenerator = WaitDelayGenerator.of(retryBackoff); + this.retryHandler = new GroupConsumerRetry(namingStrategy, sender, group, configurations.retryBackoff(), eventDeadLetters, eventSerializer); + this.delayGenerator = WaitDelayGenerator.of(configurations.retryBackoff()); this.group = group; } @@ -116,7 +112,7 @@ class GroupRegistration implements Registration { .of(createGroupWorkQueue() .then(retryHandler.createRetryExchange(queueName)) .then(Mono.fromCallable(this::consumeWorkQueue)) - .retryWhen(retryBackoff.asReactorRetry().scheduler(Schedulers.boundedElastic())) + .retryWhen(configurations.retryBackoff().asReactorRetry().scheduler(Schedulers.boundedElastic())) .block()); return this; } @@ -132,10 +128,10 @@ class GroupRegistration implements Registration { private Mono<Void> createGroupWorkQueue() { return channelPool.createWorkQueue( QueueSpecification.queue(queueName.asString()) - .durable(evaluateDurable(DURABLE, configuration.isQuorumQueuesUsed())) - .exclusive(evaluateExclusive(!EXCLUSIVE, configuration.isQuorumQueuesUsed())) - .autoDelete(evaluateAutoDelete(!AUTO_DELETE, configuration.isQuorumQueuesUsed())) - .arguments(configuration.workQueueArgumentsBuilder() + .durable(evaluateDurable(DURABLE, configurations.rabbitMQConfiguration().isQuorumQueuesUsed())) + .exclusive(evaluateExclusive(!EXCLUSIVE, configurations.rabbitMQConfiguration().isQuorumQueuesUsed())) + .autoDelete(evaluateAutoDelete(!AUTO_DELETE, configurations.rabbitMQConfiguration().isQuorumQueuesUsed())) + .arguments(configurations.rabbitMQConfiguration().workQueueArgumentsBuilder() .deadLetter(namingStrategy.deadLetterExchange()) .build())); } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java index 815dd806cf..d20734429a 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/GroupRegistrationHandler.java @@ -38,7 +38,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import org.apache.commons.lang3.tuple.Pair; -import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; import org.apache.james.backends.rabbitmq.ReceiverProvider; import org.apache.james.util.ReactorUtils; @@ -74,26 +73,24 @@ public class GroupRegistrationHandler { private final ReactorRabbitMQChannelPool channelPool; private final Sender sender; private final ReceiverProvider receiverProvider; - private final RetryBackoffConfiguration retryBackoff; + private final RabbitMQEventBus.Configurations configurations; private final EventDeadLetters eventDeadLetters; private final ListenerExecutor listenerExecutor; - private final RabbitMQConfiguration configuration; private final GroupRegistration.WorkQueueName queueName; private final Scheduler scheduler; private Optional<Disposable> consumer; - GroupRegistrationHandler(NamingStrategy namingStrategy, EventSerializer eventSerializer, ReactorRabbitMQChannelPool channelPool, Sender sender, ReceiverProvider receiverProvider, - RetryBackoffConfiguration retryBackoff, EventDeadLetters eventDeadLetters, - ListenerExecutor listenerExecutor, RabbitMQConfiguration configuration) { + GroupRegistrationHandler(NamingStrategy namingStrategy, EventSerializer eventSerializer, ReactorRabbitMQChannelPool channelPool, + Sender sender, ReceiverProvider receiverProvider, EventDeadLetters eventDeadLetters, + ListenerExecutor listenerExecutor, RabbitMQEventBus.Configurations configurations) { this.namingStrategy = namingStrategy; this.eventSerializer = eventSerializer; this.channelPool = channelPool; this.sender = sender; this.receiverProvider = receiverProvider; - this.retryBackoff = retryBackoff; this.eventDeadLetters = eventDeadLetters; this.listenerExecutor = listenerExecutor; - this.configuration = configuration; + this.configurations = configurations; this.groupRegistrations = new ConcurrentHashMap<>(); this.queueName = namingStrategy.workQueue(GROUP); this.scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE, ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler"); @@ -108,17 +105,17 @@ public class GroupRegistrationHandler { public void start() { channelPool.createWorkQueue( QueueSpecification.queue(queueName.asString()) - .durable(evaluateDurable(DURABLE, configuration.isQuorumQueuesUsed())) - .exclusive(evaluateExclusive(!EXCLUSIVE, configuration.isQuorumQueuesUsed())) - .autoDelete(evaluateAutoDelete(!AUTO_DELETE, configuration.isQuorumQueuesUsed())) - .arguments(configuration.workQueueArgumentsBuilder() + .durable(evaluateDurable(DURABLE, configurations.rabbitMQConfiguration().isQuorumQueuesUsed())) + .exclusive(evaluateExclusive(!EXCLUSIVE, configurations.rabbitMQConfiguration().isQuorumQueuesUsed())) + .autoDelete(evaluateAutoDelete(!AUTO_DELETE, configurations.rabbitMQConfiguration().isQuorumQueuesUsed())) + .arguments(configurations.rabbitMQConfiguration().workQueueArgumentsBuilder() .deadLetter(namingStrategy.deadLetterExchange()) .build()), BindingSpecification.binding() .exchange(namingStrategy.exchange()) .queue(queueName.asString()) .routingKey(EMPTY_ROUTING_KEY)) - .retryWhen(retryBackoff.asReactorRetry().scheduler(Schedulers.boundedElastic())) + .retryWhen(configurations.retryBackoff().asReactorRetry().scheduler(Schedulers.boundedElastic())) .block(); this.consumer = Optional.of(consumeWorkQueue()); @@ -196,10 +193,9 @@ public class GroupRegistrationHandler { eventSerializer, listener, group, - retryBackoff, eventDeadLetters, () -> groupRegistrations.remove(group), - listenerExecutor, configuration); + listenerExecutor, configurations); } Collection<Group> registeredGroups() { diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java index 84ab3b88d3..f7d5aad970 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/KeyRegistrationHandler.java @@ -33,7 +33,6 @@ import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.james.backends.rabbitmq.QueueArguments; -import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.ReceiverProvider; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.util.MDCBuilder; @@ -56,11 +55,9 @@ import reactor.rabbitmq.ConsumeOptions; import reactor.rabbitmq.QueueSpecification; import reactor.rabbitmq.Receiver; import reactor.rabbitmq.Sender; -import reactor.util.retry.Retry; class KeyRegistrationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(KeyRegistrationHandler.class); - private static final Duration EXPIRATION_TIMEOUT = Duration.ofMinutes(30); private static final Duration TOPOLOGY_CHANGES_TIMEOUT = Duration.ofMinutes(1); @@ -72,8 +69,7 @@ class KeyRegistrationHandler { private final RegistrationQueueName registrationQueue; private final RegistrationBinder registrationBinder; private final ListenerExecutor listenerExecutor; - private final RetryBackoffConfiguration retryBackoff; - private final RabbitMQConfiguration configuration; + private final RabbitMQEventBus.Configurations configurations; private final ReceiverProvider receiverProvider; private Optional<Disposable> receiverSubscriber; private final MetricFactory metricFactory; @@ -83,7 +79,7 @@ class KeyRegistrationHandler { KeyRegistrationHandler(NamingStrategy namingStrategy, EventBusId eventBusId, EventSerializer eventSerializer, Sender sender, ReceiverProvider receiverProvider, RoutingKeyConverter routingKeyConverter, LocalListenerRegistry localListenerRegistry, - ListenerExecutor listenerExecutor, RetryBackoffConfiguration retryBackoff, RabbitMQConfiguration configuration, MetricFactory metricFactory) { + ListenerExecutor listenerExecutor, RabbitMQEventBus.Configurations configurations, MetricFactory metricFactory) { this.eventBusId = eventBusId; this.eventSerializer = eventSerializer; this.sender = sender; @@ -91,12 +87,11 @@ class KeyRegistrationHandler { this.localListenerRegistry = localListenerRegistry; this.receiverProvider = receiverProvider; this.listenerExecutor = listenerExecutor; - this.retryBackoff = retryBackoff; - this.configuration = configuration; this.metricFactory = metricFactory; this.registrationQueue = namingStrategy.queueName(eventBusId); this.registrationBinder = new RegistrationBinder(namingStrategy, sender, registrationQueue); this.receiverSubscriber = Optional.empty(); + this.configurations = configurations; } void start() { @@ -126,24 +121,24 @@ class KeyRegistrationHandler { } private void declareQueue(Sender sender) { - QueueArguments.Builder builder = configuration.workQueueArgumentsBuilder(); - configuration.getQueueTTL().ifPresent(builder::queueTTL); + QueueArguments.Builder builder = configurations.rabbitMQConfiguration().workQueueArgumentsBuilder(); + configurations.rabbitMQConfiguration().getQueueTTL().ifPresent(builder::queueTTL); sender.declareQueue( QueueSpecification.queue(registrationQueue.asString()) - .durable(evaluateDurable(configuration.isEventBusNotificationDurabilityEnabled(), configuration.isQuorumQueuesUsed())) - .exclusive(evaluateExclusive(!EXCLUSIVE, configuration.isQuorumQueuesUsed())) - .autoDelete(evaluateAutoDelete(AUTO_DELETE, configuration.isQuorumQueuesUsed())) + .durable(evaluateDurable(configurations.rabbitMQConfiguration().isEventBusNotificationDurabilityEnabled(), configurations.rabbitMQConfiguration().isQuorumQueuesUsed())) + .exclusive(evaluateExclusive(!EXCLUSIVE, configurations.rabbitMQConfiguration().isQuorumQueuesUsed())) + .autoDelete(evaluateAutoDelete(AUTO_DELETE, configurations.rabbitMQConfiguration().isQuorumQueuesUsed())) .arguments(builder.build())) .timeout(TOPOLOGY_CHANGES_TIMEOUT) .map(AMQP.Queue.DeclareOk::getQueue) - .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor())) + .retryWhen(configurations.retryBackoff().asReactorRetry()) .block(); } void stop() { sender.delete(QueueSpecification.queue(registrationQueue.asString())) .timeout(TOPOLOGY_CHANGES_TIMEOUT) - .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.parallel())) + .retryWhen(configurations.retryBackoff().asReactorRetry().scheduler(Schedulers.parallel())) .block(); receiverSubscriber.filter(Predicate.not(Disposable::isDisposed)) .ifPresent(Disposable::dispose); @@ -158,7 +153,7 @@ class KeyRegistrationHandler { if (registration.unregister().lastListenerRemoved()) { return Mono.from(metricFactory.decoratePublisherWithTimerMetric("rabbit-unregister", registrationBinder.unbind(key) .timeout(TOPOLOGY_CHANGES_TIMEOUT) - .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.boundedElastic())))) + .retryWhen(configurations.retryBackoff().asReactorRetry().scheduler(Schedulers.boundedElastic())))) // Unbind is potentially blocking .subscribeOn(Schedulers.boundedElastic()); } @@ -172,7 +167,7 @@ class KeyRegistrationHandler { // Bind is potentially blocking .subscribeOn(Schedulers.boundedElastic()) .timeout(TOPOLOGY_CHANGES_TIMEOUT) - .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.boundedElastic())); + .retryWhen(configurations.retryBackoff().asReactorRetry().scheduler(Schedulers.boundedElastic())); } return Mono.empty(); } diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java index f8d00d49de..009613683e 100644 --- a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java +++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBus.java @@ -43,17 +43,20 @@ public class RabbitMQEventBus implements EventBus, Startable { private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running"; static final String EVENT_BUS_ID = "eventBusId"; + public record Configurations(RabbitMQConfiguration rabbitMQConfiguration, RetryBackoffConfiguration retryBackoff) { + + } + private final NamingStrategy namingStrategy; private final EventSerializer eventSerializer; private final RoutingKeyConverter routingKeyConverter; - private final RetryBackoffConfiguration retryBackoff; private final EventBusId eventBusId; private final EventDeadLetters eventDeadLetters; private final ListenerExecutor listenerExecutor; private final Sender sender; private final ReceiverProvider receiverProvider; private final ReactorRabbitMQChannelPool channelPool; - private final RabbitMQConfiguration configuration; + private final Configurations configurations; private final MetricFactory metricFactory; private volatile boolean isRunning; @@ -63,10 +66,9 @@ public class RabbitMQEventBus implements EventBus, Startable { private EventDispatcher eventDispatcher; public RabbitMQEventBus(NamingStrategy namingStrategy, Sender sender, ReceiverProvider receiverProvider, EventSerializer eventSerializer, - RetryBackoffConfiguration retryBackoff, RoutingKeyConverter routingKeyConverter, EventDeadLetters eventDeadLetters, MetricFactory metricFactory, ReactorRabbitMQChannelPool channelPool, - EventBusId eventBusId, RabbitMQConfiguration configuration) { + EventBusId eventBusId, Configurations configurations) { this.namingStrategy = namingStrategy; this.sender = sender; this.receiverProvider = receiverProvider; @@ -75,9 +77,8 @@ public class RabbitMQEventBus implements EventBus, Startable { this.eventBusId = eventBusId; this.eventSerializer = eventSerializer; this.routingKeyConverter = routingKeyConverter; - this.retryBackoff = retryBackoff; this.eventDeadLetters = eventDeadLetters; - this.configuration = configuration; + this.configurations = configurations; this.metricFactory = metricFactory; this.isRunning = false; this.isStopping = false; @@ -88,9 +89,9 @@ public class RabbitMQEventBus implements EventBus, Startable { if (!isRunning && !isStopping) { LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry(); - keyRegistrationHandler = new KeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff, configuration, metricFactory); - groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor, configuration); - eventDispatcher = new EventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters, configuration); + keyRegistrationHandler = new KeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, configurations, metricFactory); + groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, eventDeadLetters, listenerExecutor, configurations); + eventDispatcher = new EventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters, configurations.rabbitMQConfiguration()); eventDispatcher.start(); keyRegistrationHandler.start(); @@ -109,9 +110,9 @@ public class RabbitMQEventBus implements EventBus, Startable { if (!isRunning && !isStopping) { LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry(); - keyRegistrationHandler = new KeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, retryBackoff, configuration, metricFactory); - groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor, configuration); - eventDispatcher = new EventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters, configuration); + keyRegistrationHandler = new KeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, sender, receiverProvider, routingKeyConverter, localListenerRegistry, listenerExecutor, configurations, metricFactory); + groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, eventDeadLetters, listenerExecutor, configurations); + eventDispatcher = new EventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters, configurations.rabbitMQConfiguration()); keyRegistrationHandler.declareQueue(); diff --git a/event-bus/distributed/src/test/java/org/apache/james/events/NetworkErrorTest.java b/event-bus/distributed/src/test/java/org/apache/james/events/NetworkErrorTest.java index 5b9e83cbb1..8ac0ad8ea4 100644 --- a/event-bus/distributed/src/test/java/org/apache/james/events/NetworkErrorTest.java +++ b/event-bus/distributed/src/test/java/org/apache/james/events/NetworkErrorTest.java @@ -53,13 +53,12 @@ class NetworkErrorTest { void setUp() throws Exception { eventDeadLetters = new MemoryEventDeadLetters(); - EventSerializer eventSerializer = new TestEventSerializer(); RoutingKeyConverter routingKeyConverter = RoutingKeyConverter.forFactories(new EventBusTestFixture.TestRegistrationKeyFactory()); eventBus = new RabbitMQEventBus(new NamingStrategy(new EventBusName("test")), rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), - eventSerializer, RETRY_BACKOFF_CONFIGURATION, routingKeyConverter, + new TestEventSerializer(), routingKeyConverter, eventDeadLetters, new RecordingMetricFactory(), rabbitMQExtension.getRabbitChannelPool(), - EventBusId.random(), rabbitMQExtension.getRabbitMQ().getConfiguration()); + EventBusId.random(), new RabbitMQEventBus.Configurations(rabbitMQExtension.getRabbitMQ().getConfiguration(), RETRY_BACKOFF_CONFIGURATION)); eventBus.start(); } @@ -108,9 +107,9 @@ class NetworkErrorTest { .eventBusPropagateDispatchError(false) .build(); eventBus = new RabbitMQEventBus(new NamingStrategy(new EventBusName("test")), rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), - new TestEventSerializer(), RETRY_BACKOFF_CONFIGURATION, RoutingKeyConverter.forFactories(new EventBusTestFixture.TestRegistrationKeyFactory()), + new TestEventSerializer(), RoutingKeyConverter.forFactories(new EventBusTestFixture.TestRegistrationKeyFactory()), eventDeadLetters, new RecordingMetricFactory(), rabbitMQExtension.getRabbitChannelPool(), - EventBusId.random(), disablePropagateDispatchError); + EventBusId.random(), new RabbitMQEventBus.Configurations(disablePropagateDispatchError, RETRY_BACKOFF_CONFIGURATION)); eventBus.start(); assertThat(eventDeadLetters.containEvents().block()).isFalse(); diff --git a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueUpgradeTest.java b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueUpgradeTest.java index ebbd94ce48..bfd35ddf3c 100644 --- a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueUpgradeTest.java +++ b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueUpgradeTest.java @@ -59,9 +59,8 @@ class RabbitMQEventBusDeadLetterQueueUpgradeTest { RoutingKeyConverter routingKeyConverter = RoutingKeyConverter.forFactories(new TestRegistrationKeyFactory()); eventBus = new RabbitMQEventBus(NAMING_STRATEGY, rabbitMQExtension.getSender(), rabbitMQExtension.getReceiverProvider(), - eventSerializer, RETRY_BACKOFF_CONFIGURATION, routingKeyConverter, - memoryEventDeadLetters, new RecordingMetricFactory(), rabbitMQExtension.getRabbitChannelPool(), - EventBusId.random(), rabbitMQExtension.getRabbitMQ().getConfiguration()); + eventSerializer, routingKeyConverter, memoryEventDeadLetters, new RecordingMetricFactory(), rabbitMQExtension.getRabbitChannelPool(), + EventBusId.random(), new RabbitMQEventBus.Configurations(rabbitMQExtension.getRabbitMQ().getConfiguration(), RETRY_BACKOFF_CONFIGURATION)); eventBus.start(); } diff --git a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java index 9e88388eef..7c8786154c 100644 --- a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java +++ b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusTest.java @@ -159,10 +159,9 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, } private RabbitMQEventBus newEventBus(NamingStrategy namingStrategy, Sender sender, ReceiverProvider receiverProvider) throws Exception { - return new RabbitMQEventBus(namingStrategy, sender, receiverProvider, eventSerializer, - EventBusTestFixture.RETRY_BACKOFF_CONFIGURATION, routingKeyConverter, - memoryEventDeadLetters, new RecordingMetricFactory(), - rabbitMQExtension.getRabbitChannelPool(), EventBusId.random(), rabbitMQExtension.getRabbitMQ().getConfiguration()); + return new RabbitMQEventBus(namingStrategy, sender, receiverProvider, eventSerializer, routingKeyConverter, + memoryEventDeadLetters, new RecordingMetricFactory(), rabbitMQExtension.getRabbitChannelPool(), EventBusId.random(), + new RabbitMQEventBus.Configurations(rabbitMQExtension.getRabbitMQ().getConfiguration(), EventBusTestFixture.RETRY_BACKOFF_CONFIGURATION)); } @Override diff --git a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusUsingQuorumQueueTest.java b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusUsingQuorumQueueTest.java index 773b00d188..788ef1d739 100644 --- a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusUsingQuorumQueueTest.java +++ b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusUsingQuorumQueueTest.java @@ -110,9 +110,9 @@ class RabbitMQEventBusUsingQuorumQueueTest implements GroupContract.SingleEventB private RabbitMQEventBus newEventBus(NamingStrategy namingStrategy, Sender sender, ReceiverProvider receiverProvider) throws Exception { return new RabbitMQEventBus(namingStrategy, sender, receiverProvider, eventSerializer, - EventBusTestFixture.RETRY_BACKOFF_CONFIGURATION, routingKeyConverter, - memoryEventDeadLetters, new RecordingMetricFactory(), - rabbitMQExtension.getRabbitChannelPool(), EventBusId.random(), rabbitMQExtension.getRabbitMQ().withQuorumQueueConfiguration()); + routingKeyConverter, memoryEventDeadLetters, new RecordingMetricFactory(), + rabbitMQExtension.getRabbitChannelPool(), EventBusId.random(), + new RabbitMQEventBus.Configurations(rabbitMQExtension.getRabbitMQ().withQuorumQueueConfiguration(), EventBusTestFixture.RETRY_BACKOFF_CONFIGURATION)); } @Override diff --git a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java index 7a20f78d03..8b79198cc1 100644 --- a/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java +++ b/mpt/impl/imap-mailbox/rabbitmq/src/test/java/org/apache/james/mpt/imapmailbox/rabbitmq/host/RabbitMQEventBusHostSystem.java @@ -119,9 +119,8 @@ public class RabbitMQEventBusHostSystem extends JamesImapHostSystem { MailboxEventSerializer eventSerializer = new MailboxEventSerializer(mailboxIdFactory, messageIdFactory, new DefaultUserQuotaRootResolver.DefaultQuotaRootDeserializer()); RoutingKeyConverter routingKeyConverter = new RoutingKeyConverter(ImmutableSet.of(new MailboxIdRegistrationKey.Factory(mailboxIdFactory))); return new RabbitMQEventBus(MAILBOX_EVENT_NAMING_STRATEGY, reactorRabbitMQChannelPool.getSender(), reactorRabbitMQChannelPool::createReceiver, - eventSerializer, RetryBackoffConfiguration.DEFAULT, routingKeyConverter, new MemoryEventDeadLetters(), - new RecordingMetricFactory(), - reactorRabbitMQChannelPool, EventBusId.random(), dockerRabbitMQ.getConfiguration()); + eventSerializer, routingKeyConverter, new MemoryEventDeadLetters(), new RecordingMetricFactory(), + reactorRabbitMQChannelPool, EventBusId.random(), new RabbitMQEventBus.Configurations(dockerRabbitMQ.getConfiguration(), RetryBackoffConfiguration.DEFAULT)); } @Override diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java index 5e14f09063..a5ae0f6018 100644 --- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java +++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/ContentDeletionEventBusModule.java @@ -114,8 +114,8 @@ public class ContentDeletionEventBusModule extends AbstractModule { RabbitMQConfiguration configuration) { return new RabbitMQEventBus( CONTENT_DELETION_NAMING_STRATEGY, - sender, receiverProvider, eventSerializer, retryBackoffConfiguration, new RoutingKeyConverter(ImmutableSet.of(new Factory())), - eventDeadLetters, metricFactory, channelPool, eventBusId, configuration); + sender, receiverProvider, eventSerializer, new RoutingKeyConverter(ImmutableSet.of(new Factory())), + eventDeadLetters, metricFactory, channelPool, eventBusId, new RabbitMQEventBus.Configurations(configuration, retryBackoffConfiguration)); } @Provides diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java index 4fe708dcd8..14a83f7e11 100644 --- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java +++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java @@ -108,8 +108,8 @@ public class JMAPEventBusModule extends AbstractModule { RabbitMQConfiguration configuration) { return new RabbitMQEventBus( JMAP_NAMING_STRATEGY, - sender, receiverProvider, eventSerializer, retryBackoffConfiguration, new RoutingKeyConverter(ImmutableSet.of(new Factory())), - eventDeadLetters, metricFactory, channelPool, eventBusId, configuration); + sender, receiverProvider, eventSerializer, new RoutingKeyConverter(ImmutableSet.of(new Factory())), eventDeadLetters, + metricFactory, channelPool, eventBusId, new RabbitMQEventBus.Configurations(configuration, retryBackoffConfiguration)); } @Provides diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java index 9d97bacc28..b186463484 100644 --- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java +++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/MailboxEventBusModule.java @@ -100,8 +100,8 @@ public class MailboxEventBusModule extends AbstractModule { RoutingKeyConverter routingKeyConverter, EventDeadLetters eventDeadLetters, MetricFactory metricFactory, ReactorRabbitMQChannelPool channelPool, EventBusId eventBusId, RabbitMQConfiguration configuration) { - return new RabbitMQEventBus(namingStrategy, sender, receiverProvider, eventSerializer, retryBackoff, routingKeyConverter, - eventDeadLetters, metricFactory, channelPool, eventBusId, configuration); + return new RabbitMQEventBus(namingStrategy, sender, receiverProvider, eventSerializer, routingKeyConverter, + eventDeadLetters, metricFactory, channelPool, eventBusId, new RabbitMQEventBus.Configurations(configuration, retryBackoff)); } @Provides --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
