This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7423ee71cba86f767d82c7d3dcca50af9d574caa Author: Benoit Tellier <[email protected]> AuthorDate: Thu Oct 31 13:46:15 2019 +0700 JAMES-2937 Get rid of simple channelPool --- .../backends/rabbitmq/RabbitMQChannelPool.java | 55 --------- .../james/backends/rabbitmq/SimpleChannelPool.java | 128 --------------------- .../james/modules/rabbitmq/RabbitMQModule.java | 5 - 3 files changed, 188 deletions(-) diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQChannelPool.java deleted file mode 100644 index a544efb..0000000 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/RabbitMQChannelPool.java +++ /dev/null @@ -1,55 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.backends.rabbitmq; - -import com.rabbitmq.client.Channel; -import reactor.core.publisher.Flux; -import reactor.rabbitmq.AcknowledgableDelivery; - -public interface RabbitMQChannelPool { - class ConnectionFailedException extends RuntimeException { - public ConnectionFailedException(Throwable cause) { - super(cause); - } - } - - @FunctionalInterface - interface RabbitFunction<T, E extends Throwable> { - T execute(Channel channel) throws E; - } - - @FunctionalInterface - interface RabbitConsumer<E extends Throwable> { - void execute(Channel channel) throws E; - } - - <T, E extends Throwable> T execute(RabbitFunction<T, E> f) - throws E, ConnectionFailedException; - - - <E extends Throwable> void execute(RabbitConsumer<E> f) - throws E, ConnectionFailedException; - - Flux<AcknowledgableDelivery> receive(String queueName); - - boolean tryConnection(); - - void close() throws Exception; -} diff --git a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleChannelPool.java b/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleChannelPool.java deleted file mode 100644 index c322af7..0000000 --- a/backends-common/rabbitmq/src/main/java/org/apache/james/backends/rabbitmq/SimpleChannelPool.java +++ /dev/null @@ -1,128 +0,0 @@ -/**************************************************************** - * Licensed to the Apache Software Foundation (ASF) under one * - * or more contributor license agreements. See the NOTICE file * - * distributed with this work for additional information * - * regarding copyright ownership. The ASF licenses this file * - * to you under the Apache License, Version 2.0 (the * - * "License"); you may not use this file except in compliance * - * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * - * Unless required by applicable law or agreed to in writing, * - * software distributed under the License is distributed on an * - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * - * KIND, either express or implied. See the License for the * - * specific language governing permissions and limitations * - * under the License. * - ****************************************************************/ - -package org.apache.james.backends.rabbitmq; - -import java.io.IOException; -import java.time.Duration; -import java.util.Optional; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; - -import javax.annotation.PreDestroy; -import javax.inject.Inject; - -import com.github.fge.lambdas.Throwing; -import com.google.common.annotations.VisibleForTesting; -import com.rabbitmq.client.Channel; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; -import reactor.rabbitmq.AcknowledgableDelivery; -import reactor.rabbitmq.RabbitFlux; -import reactor.rabbitmq.Receiver; -import reactor.rabbitmq.ReceiverOptions; - -public class SimpleChannelPool implements RabbitMQChannelPool { - private final AtomicReference<Channel> channelReference; - private final Receiver rabbitFlux; - private final SimpleConnectionPool connectionPool; - - @Inject - @VisibleForTesting - SimpleChannelPool(SimpleConnectionPool connectionPool) { - this.connectionPool = connectionPool; - this.channelReference = new AtomicReference<>(); - this.rabbitFlux = RabbitFlux - .createReceiver(new ReceiverOptions().connectionMono(connectionPool.getResilientConnection())); - } - - @Override - public Flux<AcknowledgableDelivery> receive(String queueName) { - return rabbitFlux.consumeManualAck(queueName); - } - - @Override - public <T, E extends Throwable> T execute(RabbitFunction<T, E> f) throws E, ConnectionFailedException { - return f.execute(getResilientChannel().block()); - } - - @Override - public <E extends Throwable> void execute(RabbitConsumer<E> f) throws E, ConnectionFailedException { - f.execute(getResilientChannel().block()); - } - - @PreDestroy - @Override - public void close() { - Optional.ofNullable(channelReference.get()) - .filter(Channel::isOpen) - .ifPresent(Throwing.<Channel>consumer(Channel::close).orDoNothing()); - - try { - rabbitFlux.close(); - } catch (Throwable ignored) { - //ignore exception during close - } - } - - private Mono<Channel> getResilientChannel() { - int numRetries = 100; - Duration initialDelay = Duration.ofMillis(100); - Duration forever = Duration.ofMillis(Long.MAX_VALUE); - return Mono.defer(this::getOpenChannel) - .retryBackoff(numRetries, initialDelay, forever, Schedulers.boundedElastic()); - } - - private Mono<Channel> getOpenChannel() { - Channel previous = channelReference.get(); - return Mono.justOrEmpty(previous) - .publishOn(Schedulers.boundedElastic()) - .filter(Channel::isOpen) - .switchIfEmpty(connectionPool.getResilientConnection() - .flatMap(connection -> Mono.fromCallable(connection::createChannel))) - .flatMap(current -> replaceCurrentChannel(previous, current)) - .onErrorMap(t -> new RuntimeException("unable to create and register a new Channel", t)); - } - - private Mono<Channel> replaceCurrentChannel(Channel previous, Channel current) { - if (channelReference.compareAndSet(previous, current)) { - return Mono.just(current); - } else { - try { - current.close(); - } catch (IOException | TimeoutException e) { - //error below - } - return Mono.error(new RuntimeException("unable to create and register a new Channel")); - } - } - - @Override - public boolean tryConnection() { - try { - return connectionPool.tryConnection() && - getOpenChannel() - .blockOptional() - .isPresent(); - } catch (Throwable t) { - return false; - } - } -} diff --git a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java index 4226aea..e887d6a 100644 --- a/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java +++ b/server/container/guice/rabbitmq/src/main/java/org/apache/james/modules/rabbitmq/RabbitMQModule.java @@ -26,11 +26,9 @@ import javax.inject.Singleton; import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.james.backends.cassandra.components.CassandraModule; -import org.apache.james.backends.rabbitmq.RabbitMQChannelPool; import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.RabbitMQHealthCheck; import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; -import org.apache.james.backends.rabbitmq.SimpleChannelPool; import org.apache.james.core.healthcheck.HealthCheck; import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule; import org.apache.james.queue.api.MailQueueFactory; @@ -79,9 +77,6 @@ public class RabbitMQModule extends AbstractModule { bind(CassandraMailQueueMailDelete.class).in(Scopes.SINGLETON); bind(CassandraMailQueueMailStore.class).in(Scopes.SINGLETON); - bind(SimpleChannelPool.class).in(Scopes.SINGLETON); - bind(RabbitMQChannelPool.class).to(SimpleChannelPool.class); - Multibinder<CassandraModule> cassandraModuleBinder = Multibinder.newSetBinder(binder(), CassandraModule.class); cassandraModuleBinder.addBinding().toInstance(CassandraMailQueueViewModule.MODULE); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
