This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 147aded8fac341ecdcd86026eb8654dc85b56561 Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Tue Jul 7 08:30:41 2020 +0700 JAMES-3299 RabbitMQ EventBus key registration should not hang Debugging proved bind / unbind operation can hang. Upon such hangs, timing out will allow to retry the given operation. --- .../james/mailbox/events/KeyRegistrationHandler.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java index 5add858..8b79d06 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/KeyRegistrationHandler.java @@ -58,6 +58,8 @@ class KeyRegistrationHandler { private static final Duration EXPIRATION_TIMEOUT = Duration.ofMinutes(30); private static final Map<String, Object> QUEUE_ARGUMENTS = ImmutableMap.of("x-expires", EXPIRATION_TIMEOUT.toMillis()); + private static final Duration TOPOLOGY_CHANGES_TIMEOUT = Duration.ofMinutes(1); + private final EventBusId eventBusId; private final LocalListenerRegistry localListenerRegistry; private final EventSerializer eventSerializer; @@ -100,11 +102,13 @@ class KeyRegistrationHandler { @VisibleForTesting void declareQueue() { - sender.declareQueue(QueueSpecification.queue(EVENTBUS_QUEUE_NAME_PREFIX + eventBusId.asString()) - .durable(DURABLE) - .exclusive(!EXCLUSIVE) - .autoDelete(AUTO_DELETE) - .arguments(QUEUE_ARGUMENTS)) + sender.declareQueue( + QueueSpecification.queue(EVENTBUS_QUEUE_NAME_PREFIX + eventBusId.asString()) + .durable(DURABLE) + .exclusive(!EXCLUSIVE) + .autoDelete(AUTO_DELETE) + .arguments(QUEUE_ARGUMENTS)) + .timeout(TOPOLOGY_CHANGES_TIMEOUT) .map(AMQP.Queue.DeclareOk::getQueue) .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor())) .doOnSuccess(queueName -> { @@ -121,6 +125,7 @@ class KeyRegistrationHandler { .ifPresent(Disposable::dispose); receiver.close(); sender.delete(QueueSpecification.queue(registrationQueue.asString())) + .timeout(TOPOLOGY_CHANGES_TIMEOUT) .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic())) .block(); } @@ -132,6 +137,7 @@ class KeyRegistrationHandler { .thenReturn(new KeyRegistration(() -> { if (registration.unregister().lastListenerRemoved()) { registrationBinder.unbind(key) + .timeout(TOPOLOGY_CHANGES_TIMEOUT) .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic())) .subscribeOn(Schedulers.elastic()) .block(); @@ -142,6 +148,7 @@ class KeyRegistrationHandler { private Mono<Void> registerIfNeeded(RegistrationKey key, LocalListenerRegistry.LocalRegistration registration) { if (registration.isFirstListener()) { return registrationBinder.bind(key) + .timeout(TOPOLOGY_CHANGES_TIMEOUT) .retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.elastic())); } return Mono.empty(); --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org