This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 92f9e6543c09bc3fdd818b5598afff1039d1d73f Author: Benoit Tellier <[email protected]> AuthorDate: Wed Jun 19 14:41:45 2019 +0700 JAMES-2794 Solve some "channel closed" exceptions on topof RabbitMQ MailQueue Doing the ack operations out of the stream caused some 'channel closed' exception to arise. --- .../org/apache/james/queue/api/ManageableMailQueueContract.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java index c911e64..521bd44 100644 --- a/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java +++ b/server/queue/queue-api/src/test/java/org/apache/james/queue/api/ManageableMailQueueContract.java @@ -40,6 +40,7 @@ import org.apache.mailet.Mail; import org.apache.mailet.base.MailAddressFixture; import org.junit.jupiter.api.Test; +import com.github.fge.lambdas.Throwing; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; @@ -79,7 +80,9 @@ public interface ManageableMailQueueContract extends MailQueueContract { default void dequeueShouldDecreaseQueueSize() throws Exception { enQueue(defaultMail().name("name").build()); - Flux.from(getManageableMailQueue().deQueue()).blockFirst().done(true); + Flux.from(getManageableMailQueue().deQueue()) + .doOnNext(Throwing.consumer(item -> item.done(true))) + .blockFirst(); long size = getManageableMailQueue().getSize(); @@ -90,7 +93,9 @@ public interface ManageableMailQueueContract extends MailQueueContract { default void noAckShouldNotDecreaseSize() throws Exception { enQueue(defaultMail().name("name").build()); - Flux.from(getManageableMailQueue().deQueue()).blockFirst().done(false); + Flux.from(getManageableMailQueue().deQueue()) + .doOnNext(Throwing.consumer(item -> item.done(false))) + .blockFirst(); long size = getManageableMailQueue().getSize(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
