This is an automated email from the ASF dual-hosted git repository. aduprat pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 87136c39e011e6d0197e3a0e85a8203c1e861e35 Author: Matthieu Baechler <matth...@apache.org> AuthorDate: Fri Feb 15 09:14:26 2019 +0100 JAMES-2817 Rename subscribeWorkQueue to consumeWorkQueue and subscribe in start() --- .../apache/james/mailbox/events/GroupRegistration.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java index bbdd46b..7730efc 100644 --- a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java +++ b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java @@ -37,7 +37,6 @@ import org.apache.james.util.MDCBuilder; import com.github.fge.lambdas.Throwing; import com.google.common.base.Preconditions; import com.rabbitmq.client.Connection; - import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -105,10 +104,11 @@ class GroupRegistration implements Registration { } GroupRegistration start() { - createGroupWorkQueue() - .then(retryHandler.createRetryExchange(queueName)) - .doOnSuccess(any -> this.subscribeWorkQueue()) - .block(); + receiverSubscriber = Optional + .of(createGroupWorkQueue() + .then(retryHandler.createRetryExchange(queueName)) + .then(Mono.fromCallable(() -> this.consumeWorkQueue())) + .block()); return this; } @@ -126,12 +126,12 @@ class GroupRegistration implements Registration { .then(); } - private void subscribeWorkQueue() { - receiverSubscriber = Optional.of(receiver.consumeManualAck(queueName.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE)) + private Disposable consumeWorkQueue() { + return receiver.consumeManualAck(queueName.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE)) + .publishOn(Schedulers.parallel()) .filter(delivery -> Objects.nonNull(delivery.getBody())) .flatMap(this::deliver) - .subscribeOn(Schedulers.elastic()) - .subscribe()); + .subscribe(); } private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) { --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org