This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d4aadcb14933472fa8fa3e4dbd8171ae90503421
Author: Quan Tran <[email protected]>
AuthorDate: Wed Dec 4 14:39:23 2024 +0700

    JAMES-3605 TerminationReconnectionHandler should not create the termination 
queue upon reconnection
    
    Otherwise error:
    ```
    com.rabbitmq.client.AlreadyClosedException: channel is already closed due 
to channel error; protocol method: #method<channel.close>(reply-code=406, 
reply-text=PRECONDITION_FAILED - inequivalent arg 'x-queue-version' for queue 
'terminationSubscriber291d56fc-1e2d-4fb3-a455-bad297263f43' in vhost '/': 
received none but current is the value '2' of type 'signedint', class-id=50, 
method-id=10)
    ```
    
    And cause the remaining reconnection handlers to fail.
---
 .../TerminationReconnectionHandler.java            | 37 ++--------------------
 1 file changed, 2 insertions(+), 35 deletions(-)

diff --git 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java
 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java
index f21e4061fa..121d2a3dfe 100644
--- 
a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java
+++ 
b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TerminationReconnectionHandler.java
@@ -19,58 +19,25 @@
 
 package org.apache.james.task.eventsourcing.distributed;
 
-import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
-import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
-import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
-import static org.apache.james.backends.rabbitmq.Constants.evaluateAutoDelete;
-import static org.apache.james.backends.rabbitmq.Constants.evaluateDurable;
-import static org.apache.james.backends.rabbitmq.Constants.evaluateExclusive;
-
 import jakarta.inject.Inject;
 
-import org.apache.james.backends.rabbitmq.QueueArguments;
-import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.reactivestreams.Publisher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 
 import reactor.core.publisher.Mono;
 
 public class TerminationReconnectionHandler implements 
SimpleConnectionPool.ReconnectionHandler {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(TerminationReconnectionHandler.class);
-
-    private final TerminationQueueName queueName;
     private final RabbitMQTerminationSubscriber terminationSubscriber;
-    private final RabbitMQConfiguration configuration;
 
     @Inject
-    public TerminationReconnectionHandler(TerminationQueueName queueName, 
RabbitMQTerminationSubscriber terminationSubscriber, RabbitMQConfiguration 
configuration) {
-        this.queueName = queueName;
+    public TerminationReconnectionHandler(RabbitMQTerminationSubscriber 
terminationSubscriber) {
         this.terminationSubscriber = terminationSubscriber;
-        this.configuration = configuration;
     }
 
     @Override
     public Publisher<Void> handleReconnection(Connection connection) {
-        return Mono.fromRunnable(() -> createTerminationQueue(connection))
-            .then(Mono.fromRunnable(terminationSubscriber::restart));
-    }
-
-    private void createTerminationQueue(Connection connection) {
-        try (Channel channel = connection.createChannel()) {
-            QueueArguments.Builder builder = QueueArguments.builder();
-            configuration.getQueueTTL().ifPresent(builder::queueTTL);
-            channel.queueDeclare(queueName.asString(),
-                evaluateDurable(!DURABLE, configuration.isQuorumQueuesUsed()),
-                evaluateExclusive(!EXCLUSIVE, 
configuration.isQuorumQueuesUsed()),
-                evaluateAutoDelete(!AUTO_DELETE, 
configuration.isQuorumQueuesUsed()),
-                builder.build());
-        } catch (Exception e) {
-            LOGGER.error("Error recovering connection", e);
-        }
+        return Mono.fromRunnable(terminationSubscriber::restart);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to