JAMES-2551 SimpleChannelPool should provide resilient channels & connections
Before that, SimpleChannelPool has been failed in HealthCheckTest. RabbitMQ start and stop causes single final Connection & Channel being closed and never getting recovered, use AtomicReference to check whether connection or channel should be re-new if they are closed Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/0e8dd583 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/0e8dd583 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/0e8dd583 Branch: refs/heads/master Commit: 0e8dd5831cf31c77ae0a1b22947b00c99e898cbb Parents: 6f76f9f Author: duc <[email protected]> Authored: Thu Oct 11 13:55:04 2018 +0700 Committer: Benoit Tellier <[email protected]> Committed: Thu Oct 25 15:07:52 2018 +0700 ---------------------------------------------------------------------- .../backend/rabbitmq/SimpleChannelPool.java | 59 +++++++++++++++----- 1 file changed, 44 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/0e8dd583/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java ---------------------------------------------------------------------- diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java index e887f72..31d8530 100644 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java +++ b/backends-common/rabbitmq/src/main/java/org/apache/james/backend/rabbitmq/SimpleChannelPool.java @@ -19,37 +19,66 @@ package org.apache.james.backend.rabbitmq; -import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.PreDestroy; + +import com.github.fge.lambdas.Throwing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class SimpleChannelPool implements RabbitMQChannelPool { - private final Channel channel; - private final Connection connection; + private final AtomicReference<Channel> channelReference; + private final AtomicReference<Connection> connectionReference; + private final RabbitMQConnectionFactory connectionFactory; - public SimpleChannelPool(RabbitMQConnectionFactory factory) throws IOException { - this.connection = factory.create(); - this.channel = connection.createChannel(); + public SimpleChannelPool(RabbitMQConnectionFactory factory) { + this.connectionFactory = factory; + this.connectionReference = new AtomicReference<>(); + this.channelReference = new AtomicReference<>(); } @Override public synchronized <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws E, ConnectionFailedException { - return f.execute(channel); + return f.execute(getResilientChannel()); } @Override public synchronized <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, ConnectionFailedException { - f.execute(channel); + f.execute(getResilientChannel()); } + @PreDestroy @Override - public void close() throws Exception { - if (channel.isOpen()) { - channel.close(); - } - if (connection.isOpen()) { - connection.close(); - } + public synchronized void close() { + Optional.ofNullable(channelReference.get()) + .filter(Channel::isOpen) + .ifPresent(Throwing.<Channel>consumer(Channel::close).sneakyThrow()); + + Optional.ofNullable(connectionReference.get()) + .filter(Connection::isOpen) + .ifPresent(Throwing.<Connection>consumer(Connection::close).sneakyThrow()); + } + + private Connection getResilientConnection() { + return connectionReference.updateAndGet(this::getOpenConnection); + } + + private Connection getOpenConnection(Connection checkedConnection) { + return Optional.ofNullable(checkedConnection) + .filter(Connection::isOpen) + .orElseGet(connectionFactory::create); + } + + private Channel getResilientChannel() { + return channelReference.updateAndGet(Throwing.unaryOperator(this::getOpenChannel)); + } + + private Channel getOpenChannel(Channel checkedChannel) { + return Optional.ofNullable(checkedChannel) + .filter(Channel::isOpen) + .orElseGet(Throwing.supplier(() -> getResilientConnection().createChannel()) + .sneakyThrow()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
