This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d4aadcb14933472fa8fa3e4dbd8171ae90503421 Author: Quan Tran <[email protected]> AuthorDate: Wed Dec 4 14:39:23 2024 +0700 JAMES-3605 TerminationReconnectionHandler should not create the termination queue upon reconnection Otherwise error: ``` com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-queue-version' for queue 'terminationSubscriber291d56fc-1e2d-4fb3-a455-bad297263f43' in vhost '/': received none but current is the value '2' of type 'signedint', class-id=50, method-id=10) ``` And cause the remaining reconnection handlers to fail. --- .../TerminationReconnectionHandler.java | 37 ++-------------------- 1 file changed, 2 insertions(+), 35 deletions(-) diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java index f21e4061fa..121d2a3dfe 100644 --- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java +++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java @@ -19,58 +19,25 @@ package org.apache.james.task.eventsourcing.distributed; -import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; -import static org.apache.james.backends.rabbitmq.Constants.DURABLE; -import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; -import static org.apache.james.backends.rabbitmq.Constants.evaluateAutoDelete; -import static org.apache.james.backends.rabbitmq.Constants.evaluateDurable; -import static org.apache.james.backends.rabbitmq.Constants.evaluateExclusive; - import jakarta.inject.Inject; -import org.apache.james.backends.rabbitmq.QueueArguments; -import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; import org.apache.james.backends.rabbitmq.SimpleConnectionPool; import org.reactivestreams.Publisher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import reactor.core.publisher.Mono; public class TerminationReconnectionHandler implements SimpleConnectionPool.ReconnectionHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(TerminationReconnectionHandler.class); - - private final TerminationQueueName queueName; private final RabbitMQTerminationSubscriber terminationSubscriber; - private final RabbitMQConfiguration configuration; @Inject - public TerminationReconnectionHandler(TerminationQueueName queueName, RabbitMQTerminationSubscriber terminationSubscriber, RabbitMQConfiguration configuration) { - this.queueName = queueName; + public TerminationReconnectionHandler(RabbitMQTerminationSubscriber terminationSubscriber) { this.terminationSubscriber = terminationSubscriber; - this.configuration = configuration; } @Override public Publisher<Void> handleReconnection(Connection connection) { - return Mono.fromRunnable(() -> createTerminationQueue(connection)) - .then(Mono.fromRunnable(terminationSubscriber::restart)); - } - - private void createTerminationQueue(Connection connection) { - try (Channel channel = connection.createChannel()) { - QueueArguments.Builder builder = QueueArguments.builder(); - configuration.getQueueTTL().ifPresent(builder::queueTTL); - channel.queueDeclare(queueName.asString(), - evaluateDurable(!DURABLE, configuration.isQuorumQueuesUsed()), - evaluateExclusive(!EXCLUSIVE, configuration.isQuorumQueuesUsed()), - evaluateAutoDelete(!AUTO_DELETE, configuration.isQuorumQueuesUsed()), - builder.build()); - } catch (Exception e) { - LOGGER.error("Error recovering connection", e); - } + return Mono.fromRunnable(terminationSubscriber::restart); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
