Repository: james-project
Updated Branches:
  refs/heads/master d87517fb8 -> 790a203ce


JAMES-2551 Closing connections of RabbitMQChannelPool when shutting down this 
pool

Before that, only channels were closed, this makes much error logs in tests, 
because
connections are still not closed after each test.


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/e410971d
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/e410971d
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/e410971d

Branch: refs/heads/master
Commit: e410971d61e78cb9acfd42cf016fcf75bc8baeec
Parents: d87517f
Author: duc <[email protected]>
Authored: Mon Oct 8 16:50:41 2018 +0700
Committer: tran tien duc <[email protected]>
Committed: Thu Oct 25 10:10:42 2018 +0700

----------------------------------------------------------------------
 .../backend/rabbitmq/RabbitChannelPoolImpl.java | 19 +++++++++-----
 .../rabbitmq/RabbitMQHealthCheckTest.java       | 27 +++++---------------
 .../queue/rabbitmq/RabbitMQMailQueueTest.java   | 14 +++-------
 3 files changed, 23 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/e410971d/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java
 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java
index fcfdcba..ed58f6c 100644
--- 
a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java
+++ 
b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.backend.rabbitmq;
 
+import java.io.IOException;
 import java.util.function.Supplier;
 
 import javax.annotation.PreDestroy;
@@ -38,16 +39,16 @@ import com.rabbitmq.client.Connection;
 public class RabbitChannelPoolImpl implements RabbitMQChannelPool {
 
     private static class ChannelBasePooledObjectFactory extends 
BasePooledObjectFactory<Channel> {
-        private final Supplier<Connection> connection;
+        private final Supplier<Connection> rabbitConnection;
 
         public ChannelBasePooledObjectFactory(RabbitMQConnectionFactory 
factory) {
-            this.connection = MemoizedSupplier.of(
-                    Throwing.supplier(() -> factory.create()).sneakyThrow());
+            this.rabbitConnection = MemoizedSupplier.of(
+                    Throwing.supplier(factory::create).sneakyThrow());
         }
 
         @Override
         public Channel create() throws Exception {
-            return connection.get()
+            return rabbitConnection.get()
                     .createChannel();
         }
 
@@ -64,11 +65,12 @@ public class RabbitChannelPoolImpl implements 
RabbitMQChannelPool {
     }
 
     private final ObjectPool<Channel> pool;
+    private final ChannelBasePooledObjectFactory pooledObjectFactory;
 
     @Inject
     public RabbitChannelPoolImpl(RabbitMQConnectionFactory factory) {
-        pool = new GenericObjectPool<>(
-            new ChannelBasePooledObjectFactory(factory));
+        pooledObjectFactory = new ChannelBasePooledObjectFactory(factory);
+        pool = new GenericObjectPool<>(pooledObjectFactory);
     }
 
     @Override
@@ -92,8 +94,11 @@ public class RabbitChannelPoolImpl implements 
RabbitMQChannelPool {
     }
 
     @PreDestroy
-    public void close() {
+    public void close() throws IOException {
         pool.close();
+        pooledObjectFactory.rabbitConnection
+            .get()
+            .close();
     }
 
     private Channel borrowChannel() {

http://git-wip-us.apache.org/repos/asf/james-project/blob/e410971d/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java
----------------------------------------------------------------------
diff --git 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java
 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java
index b88ef8f..ff85217 100644
--- 
a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java
+++ 
b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java
@@ -19,36 +19,23 @@
 
 package org.apache.james.backend.rabbitmq;
 
-import static 
org.apache.james.backend.rabbitmq.RabbitMQFixture.DEFAULT_MANAGEMENT_CREDENTIAL;
 import static org.assertj.core.api.Assertions.assertThat;
 
-import java.util.concurrent.Executors;
-
 import org.apache.james.core.healthcheck.Result;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
+import org.junit.jupiter.api.extension.RegisterExtension;
 
-@ExtendWith(RabbitMQExtension.class)
 class RabbitMQHealthCheckTest {
-    private RabbitMQHealthCheck healthCheck;
 
-    @BeforeEach
-    void setUp(DockerRabbitMQ rabbitMQ) throws Exception {
+    @RegisterExtension
+    static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
 
-        RabbitMQConfiguration rabbitMQConfiguration = 
RabbitMQConfiguration.builder()
-            .amqpUri(rabbitMQ.amqpUri())
-            .managementUri(rabbitMQ.managementUri())
-            .managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL)
-            .build();
-
-        RabbitMQConnectionFactory rabbitMQConnectionFactory = new 
RabbitMQConnectionFactory(rabbitMQConfiguration,
-                new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
+    private RabbitMQHealthCheck healthCheck;
 
-        healthCheck = new RabbitMQHealthCheck(
-            new RabbitChannelPoolImpl(rabbitMQConnectionFactory));
+    @BeforeEach
+    void setUp() throws Exception {
+        healthCheck = new 
RabbitMQHealthCheck(rabbitMQExtension.getRabbitChannelPool());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/james-project/blob/e410971d/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
----------------------------------------------------------------------
diff --git 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index bb06e36..909e787 100644
--- 
a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ 
b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -26,7 +26,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 import java.time.Duration;
 import java.time.Instant;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Function;
 import java.util.stream.IntStream;
@@ -36,9 +35,7 @@ import javax.mail.internet.MimeMessage;
 
 import org.apache.james.backend.rabbitmq.DockerRabbitMQ;
 import org.apache.james.backend.rabbitmq.RabbitMQConfiguration;
-import org.apache.james.backend.rabbitmq.RabbitMQConnectionFactory;
 import org.apache.james.backend.rabbitmq.RabbitMQExtension;
-import org.apache.james.backend.rabbitmq.SimpleChannelPool;
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
 import org.apache.james.backends.cassandra.components.CassandraModule;
@@ -63,13 +60,10 @@ import org.apache.james.util.streams.Iterators;
 import org.apache.mailet.Mail;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import com.github.fge.lambdas.Throwing;
-import com.nurkiewicz.asyncretry.AsyncRetryExecutor;
 
-@ExtendWith(RabbitMQExtension.class)
 public class RabbitMQMailQueueTest implements ManageableMailQueueContract, 
MailQueueMetricContract {
     private static final HashBlobId.Factory BLOB_ID_FACTORY = new 
HashBlobId.Factory();
     private static final int THREE_BUCKET_COUNT = 3;
@@ -88,6 +82,9 @@ public class RabbitMQMailQueueTest implements 
ManageableMailQueueContract, MailQ
         CassandraMailQueueViewModule.MODULE,
         CassandraEventStoreModule.MODULE));
 
+    @RegisterExtension
+    static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension();
+
     private RabbitMQMailQueueFactory mailQueueFactory;
     private UpdatableTickingClock clock;
     private RabbitMQMailQueue mailQueue;
@@ -124,10 +121,7 @@ public class RabbitMQMailQueueTest implements 
ManageableMailQueueContract, MailQ
             .managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL)
             .build();
 
-        RabbitMQConnectionFactory rabbitMQConnectionFactory = new 
RabbitMQConnectionFactory(rabbitMQConfiguration,
-                new 
AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor()));
-
-        RabbitClient rabbitClient = new RabbitClient(new 
SimpleChannelPool(rabbitMQConnectionFactory));
+        RabbitClient rabbitClient = new 
RabbitClient(rabbitMQExtension.getRabbitChannelPool());
         RabbitMQMailQueueFactory.PrivateFactory factory = new 
RabbitMQMailQueueFactory.PrivateFactory(
             metricTestSystem.getSpyMetricFactory(),
             metricTestSystem.getSpyGaugeRegistry(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to