This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 92f9e6543c09bc3fdd818b5598afff1039d1d73f
Author: Benoit Tellier <[email protected]>
AuthorDate: Wed Jun 19 14:41:45 2019 +0700

    JAMES-2794 Solve some "channel closed" exceptions on topof RabbitMQ 
MailQueue
    
    Doing the ack operations out of the stream caused some 'channel closed' 
exception
    to arise.
---
 .../org/apache/james/queue/api/ManageableMailQueueContract.java  | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
 
b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
index c911e64..521bd44 100644
--- 
a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
+++ 
b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java
@@ -40,6 +40,7 @@ import org.apache.mailet.Mail;
 import org.apache.mailet.base.MailAddressFixture;
 import org.junit.jupiter.api.Test;
 
+import com.github.fge.lambdas.Throwing;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
 
@@ -79,7 +80,9 @@ public interface ManageableMailQueueContract extends 
MailQueueContract {
     default void dequeueShouldDecreaseQueueSize() throws Exception {
         enQueue(defaultMail().name("name").build());
 
-        Flux.from(getManageableMailQueue().deQueue()).blockFirst().done(true);
+        Flux.from(getManageableMailQueue().deQueue())
+            .doOnNext(Throwing.consumer(item -> item.done(true)))
+            .blockFirst();
 
         long size = getManageableMailQueue().getSize();
 
@@ -90,7 +93,9 @@ public interface ManageableMailQueueContract extends 
MailQueueContract {
     default void noAckShouldNotDecreaseSize() throws Exception {
         enQueue(defaultMail().name("name").build());
 
-        Flux.from(getManageableMailQueue().deQueue()).blockFirst().done(false);
+        Flux.from(getManageableMailQueue().deQueue())
+            .doOnNext(Throwing.consumer(item -> item.done(false)))
+            .blockFirst();
 
         long size = getManageableMailQueue().getSize();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to