JAMES-2551 revert to old `RabbitMQMailQueueTest` which uses `SimpleChannelPool`
- To avoid rabbitmq concurrent issue of `RabbitMQChannelPoolImpl`. - close `SimpleChannelPool`'s channels after each test Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/6f76f9f8 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/6f76f9f8 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/6f76f9f8 Branch: refs/heads/master Commit: 6f76f9f8d693214323885541fd3b0e333c416ca2 Parents: aa7882d Author: duc <[email protected]> Authored: Wed Oct 10 17:54:13 2018 +0700 Committer: Benoit Tellier <[email protected]> Committed: Thu Oct 25 15:07:52 2018 +0700 ---------------------------------------------------------------------- .../backend/rabbitmq/RabbitChannelPoolImpl.java | 19 +++++++++++++++---- .../backend/rabbitmq/RabbitMQChannelPool.java | 2 ++ .../backend/rabbitmq/SimpleChannelPool.java | 15 ++++++++++++++- .../backend/rabbitmq/RabbitMQExtension.java | 17 ++++++++++++----- .../rabbitmq/RabbitMQHealthCheckTest.java | 2 +- ...RabbitMQMailQueueConfigurationChangeTest.java | 2 +- .../queue/rabbitmq/RabbitMQMailQueueTest.java | 2 +- .../rabbitmq/RabbitMqMailQueueFactoryTest.java | 2 +- 8 files changed, 47 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/6f76f9f8/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java index f5e6934..9a5be46 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitChannelPoolImpl.java @@ -19,6 +19,8 @@ package org.apache.james.backend.rabbitmq; +import java.io.IOException; + import javax.annotation.PreDestroy; import javax.inject.Inject; @@ -61,12 +63,20 @@ public class RabbitChannelPoolImpl implements RabbitMQChannelPool { @Override public void destroyObject(PooledObject<Channel> pooledObject) throws Exception { Channel channel = pooledObject.getObject(); - channel.close(); + if (channel.isOpen()) { + channel.close(); + } } - private void closeRabbitConnection() { + private void closeRabbitConnectionIfInitialized() { rabbitConnectionSupplier.ifInitialized( - Throwing.<Connection>consumer(Connection::close).sneakyThrow()); + Throwing.consumer(this::closeRabbitConnection).sneakyThrow()); + } + + private void closeRabbitConnection(Connection connection) throws IOException { + if (connection.isOpen()) { + connection.close(); + } } } @@ -100,10 +110,11 @@ public class RabbitChannelPoolImpl implements RabbitMQChannelPool { } @PreDestroy + @Override public void close() { try { pool.close(); - pooledChannelsFactory.closeRabbitConnection(); + pooledChannelsFactory.closeRabbitConnectionIfInitialized(); } catch (Exception e) { LOGGER.error("error while closing rabbit channels & connections", e); } http://git-wip-us.apache.org/repos/asf/james-project/blob/6f76f9f8/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java index 44b666d..6547c58 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/RabbitMQChannelPool.java @@ -44,4 +44,6 @@ public interface RabbitMQChannelPool { <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, ConnectionFailedException; + + void close() throws Exception; } http://git-wip-us.apache.org/repos/asf/james-project/blob/6f76f9f8/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java index 8381aac..e887f72 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java @@ -22,12 +22,15 @@ package org.apache.james.backend.rabbitmq; import java.io.IOException; import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; public class SimpleChannelPool implements RabbitMQChannelPool { private final Channel channel; + private final Connection connection; public SimpleChannelPool(RabbitMQConnectionFactory factory) throws IOException { - this.channel = factory.create().createChannel(); + this.connection = factory.create(); + this.channel = connection.createChannel(); } @Override @@ -39,4 +42,14 @@ public class SimpleChannelPool implements RabbitMQChannelPool { public synchronized <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, ConnectionFailedException { f.execute(channel); } + + @Override + public void close() throws Exception { + if (channel.isOpen()) { + channel.close(); + } + if (connection.isOpen()) { + connection.close(); + } + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/6f76f9f8/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java index c8762e7..3822e69 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQExtension.java @@ -38,6 +38,7 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, private DockerRabbitMQ rabbitMQ; private RabbitChannelPoolImpl rabbitChannelPoolImpl; + private SimpleChannelPool simpleChannelPool; @Override public void beforeAll(ExtensionContext context) { @@ -47,11 +48,14 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, @Override public void beforeEach(ExtensionContext extensionContext) throws Exception { - rabbitChannelPoolImpl = createRabbitChannelPool(); + RabbitMQConnectionFactory connectionFactory = createRabbitConnectionFactory(); + this.rabbitChannelPoolImpl = new RabbitChannelPoolImpl(connectionFactory); + this.simpleChannelPool = new SimpleChannelPool(connectionFactory); } @Override public void afterEach(ExtensionContext context) throws Exception { + simpleChannelPool.close(); rabbitChannelPoolImpl.close(); rabbitMQ.reset(); } @@ -71,24 +75,27 @@ public class RabbitMQExtension implements BeforeAllCallback, BeforeEachCallback, return rabbitMQ; } - public RabbitChannelPoolImpl getRabbitChannelPool() { + public RabbitChannelPoolImpl getRabbitChannelPoolImpl() { return rabbitChannelPoolImpl; } + public SimpleChannelPool getSimpleChannelPool() { + return simpleChannelPool; + } + public DockerRabbitMQ getRabbitMQ() { return rabbitMQ; } - private RabbitChannelPoolImpl createRabbitChannelPool() throws URISyntaxException { + private RabbitMQConnectionFactory createRabbitConnectionFactory() throws URISyntaxException { RabbitMQConfiguration rabbitMQConfiguration = RabbitMQConfiguration.builder() .amqpUri(rabbitMQ.amqpUri()) .managementUri(rabbitMQ.managementUri()) .managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL) .build(); - RabbitMQConnectionFactory rabbitMQConnectionFactory = new RabbitMQConnectionFactory( + return new RabbitMQConnectionFactory( rabbitMQConfiguration, new AsyncRetryExecutor(Executors.newSingleThreadScheduledExecutor())); - return new RabbitChannelPoolImpl(rabbitMQConnectionFactory); } } http://git-wip-us.apache.org/repos/asf/james-project/blob/6f76f9f8/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java index ff85217..0ba57e1 100644 --- a/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java +++ b/backends-common/rabbitmq/src/test/java/org/apache/james/backend/rabbitmq/RabbitMQHealthCheckTest.java @@ -35,7 +35,7 @@ class RabbitMQHealthCheckTest { @BeforeEach void setUp() throws Exception { - healthCheck = new RabbitMQHealthCheck(rabbitMQExtension.getRabbitChannelPool()); + healthCheck = new RabbitMQHealthCheck(rabbitMQExtension.getRabbitChannelPoolImpl()); } @Test http://git-wip-us.apache.org/repos/asf/james-project/blob/6f76f9f8/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java index 5645bef..860973e 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueConfigurationChangeTest.java @@ -106,7 +106,7 @@ class RabbitMQMailQueueConfigurationChangeTest { .managementUri(rabbitMQExtension.getRabbitMQ().managementUri()) .managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL) .build(); - rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool()); + rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPoolImpl()); mqManagementApi = new RabbitMQManagementApi(rabbitMQConfiguration); } http://git-wip-us.apache.org/repos/asf/james-project/blob/6f76f9f8/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java index 909e787..1c94efb 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java @@ -121,7 +121,7 @@ public class RabbitMQMailQueueTest implements ManageableMailQueueContract, MailQ .managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL) .build(); - RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool()); + RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getSimpleChannelPool()); RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory( metricTestSystem.getSpyMetricFactory(), metricTestSystem.getSpyGaugeRegistry(), http://git-wip-us.apache.org/repos/asf/james-project/blob/6f76f9f8/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java ---------------------------------------------------------------------- diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java index 0fa4e5a..311bafd 100644 --- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java +++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMqMailQueueFactoryTest.java @@ -66,7 +66,7 @@ class RabbitMqMailQueueFactoryTest implements MailQueueFactoryContract<RabbitMQM .managementCredentials(DEFAULT_MANAGEMENT_CREDENTIAL) .build(); - RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPool()); + RabbitClient rabbitClient = new RabbitClient(rabbitMQExtension.getRabbitChannelPoolImpl()); RabbitMQMailQueueFactory.PrivateFactory factory = new RabbitMQMailQueueFactory.PrivateFactory( new NoopMetricFactory(), new NoopGaugeRegistry(), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
