JAMES-2659 re-implement guard logic for RabbitEventBus start/stop
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/52ec5dbb Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/52ec5dbb Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/52ec5dbb Branch: refs/heads/master Commit: 52ec5dbb2065d17dc3ffa04f864c32332609e282 Parents: d086637 Author: Matthieu Baechler <[email protected]> Authored: Thu Feb 7 17:07:21 2019 +0100 Committer: Matthieu Baechler <[email protected]> Committed: Fri Feb 8 10:19:55 2019 +0100 ---------------------------------------------------------------------- .../james/mailbox/events/RabbitMQEventBus.java | 36 +++++++++++++------- 1 file changed, 23 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/52ec5dbb/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java ---------------------------------------------------------------------- diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java index 2a9a07d..c800850 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/RabbitMQEventBus.java @@ -20,7 +20,6 @@ package org.apache.james.mailbox.events; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.PreDestroy; import javax.inject.Inject; @@ -31,7 +30,6 @@ import org.apache.james.metrics.api.MetricFactory; import com.github.fge.lambdas.Throwing; import com.rabbitmq.client.Connection; - import reactor.core.publisher.Mono; import reactor.rabbitmq.RabbitFlux; import reactor.rabbitmq.Sender; @@ -44,13 +42,14 @@ public class RabbitMQEventBus implements EventBus { private final Mono<Connection> connectionMono; private final EventSerializer eventSerializer; - private final AtomicBoolean isRunning; private final RoutingKeyConverter routingKeyConverter; private final RetryBackoffConfiguration retryBackoff; private final EventBusId eventBusId; private final EventDeadLetters eventDeadLetters; private final MailboxListenerExecutor mailboxListenerExecutor; + private volatile boolean isRunning; + private volatile boolean isStopping; private GroupRegistrationHandler groupRegistrationHandler; private KeyRegistrationHandler keyRegistrationHandler; private EventDispatcher eventDispatcher; @@ -68,11 +67,12 @@ public class RabbitMQEventBus implements EventBus { this.routingKeyConverter = routingKeyConverter; this.retryBackoff = retryBackoff; this.eventDeadLetters = eventDeadLetters; - this.isRunning = new AtomicBoolean(false); + this.isRunning = false; + this.isStopping = false; } public void start() { - if (!isRunning.get()) { + if (!isRunning && !isStopping) { sender = RabbitFlux.createSender(new SenderOptions().connectionMono(connectionMono) .resourceManagementChannelMono(connectionMono.map(Throwing.function(Connection::createChannel)))); MailboxListenerRegistry mailboxListenerRegistry = new MailboxListenerRegistry(); @@ -82,35 +82,45 @@ public class RabbitMQEventBus implements EventBus { eventDispatcher.start(); keyRegistrationHandler.start(); - isRunning.set(true); + isRunning = true; } } @PreDestroy public void stop() { - if (isRunning.get()) { + if (isRunning && !isStopping) { + isStopping = true; + isRunning = false; groupRegistrationHandler.stop(); keyRegistrationHandler.stop(); sender.close(); - isRunning.set(false); } } @Override public Registration register(MailboxListener listener, RegistrationKey key) { - return keyRegistrationHandler.register(listener, key); + if (isRunning) { + return keyRegistrationHandler.register(listener, key); + } + throw new IllegalStateException("Event Bus is not running"); } @Override public Registration register(MailboxListener listener, Group group) { - return groupRegistrationHandler.register(listener, group); + if (isRunning) { + return groupRegistrationHandler.register(listener, group); + } + throw new IllegalStateException("Event Bus is not running"); } @Override public Mono<Void> dispatch(Event event, Set<RegistrationKey> key) { - if (!event.isNoop()) { - return eventDispatcher.dispatch(event, key); + if (isRunning) { + if (!event.isNoop()) { + return eventDispatcher.dispatch(event, key); + } + return Mono.empty(); } - return Mono.empty(); + throw new IllegalStateException("Event Bus is not running"); } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
