This is an automated email from the ASF dual-hosted git repository.

aduprat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 87136c39e011e6d0197e3a0e85a8203c1e861e35
Author: Matthieu Baechler <matth...@apache.org>
AuthorDate: Fri Feb 15 09:14:26 2019 +0100

    JAMES-2817 Rename subscribeWorkQueue to consumeWorkQueue and subscribe in 
start()
---
 .../apache/james/mailbox/events/GroupRegistration.java | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
index bbdd46b..7730efc 100644
--- 
a/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
+++ 
b/mailbox/event/event-rabbitmq/src/main/java/org/apache/james/mailbox/events/GroupRegistration.java
@@ -37,7 +37,6 @@ import org.apache.james.util.MDCBuilder;
 import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Preconditions;
 import com.rabbitmq.client.Connection;
-
 import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -105,10 +104,11 @@ class GroupRegistration implements Registration {
     }
 
     GroupRegistration start() {
-        createGroupWorkQueue()
-            .then(retryHandler.createRetryExchange(queueName))
-            .doOnSuccess(any -> this.subscribeWorkQueue())
-            .block();
+        receiverSubscriber = Optional
+            .of(createGroupWorkQueue()
+                .then(retryHandler.createRetryExchange(queueName))
+                .then(Mono.fromCallable(() -> this.consumeWorkQueue()))
+                .block());
         return this;
     }
 
@@ -126,12 +126,12 @@ class GroupRegistration implements Registration {
             .then();
     }
 
-    private void subscribeWorkQueue() {
-        receiverSubscriber = 
Optional.of(receiver.consumeManualAck(queueName.asString(), new 
ConsumeOptions().qos(EventBus.EXECUTION_RATE))
+    private Disposable consumeWorkQueue() {
+        return receiver.consumeManualAck(queueName.asString(), new 
ConsumeOptions().qos(EventBus.EXECUTION_RATE))
+            .publishOn(Schedulers.parallel())
             .filter(delivery -> Objects.nonNull(delivery.getBody()))
             .flatMap(this::deliver)
-            .subscribeOn(Schedulers.elastic())
-            .subscribe());
+            .subscribe();
     }
 
     private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to