This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b6ed512b41fae241e30b5691f875c38a19e78195
Author: Quan Tran <[email protected]>
AuthorDate: Wed Dec 4 14:57:16 2024 +0700

    JAMES-3605 Implement DeletedMessageVaultWorkQueueReconnectionHandler
    
    So the consumer for deleted message vault queue could reconnect if needed.
---
 ...edMessageVaultWorkQueueReconnectionHandler.java | 43 ++++++++++++++++++++++
 ...ributedDeletedMessageVaultDeletionCallback.java | 24 +++++++++---
 .../DistributedDeletedMessageVaultModule.java      |  4 ++
 3 files changed, 66 insertions(+), 5 deletions(-)

diff --git 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java
new file mode 100644
index 0000000000..0893a66d34
--- /dev/null
+++ 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DeletedMessageVaultWorkQueueReconnectionHandler.java
@@ -0,0 +1,43 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.modules.mailbox;
+
+import jakarta.inject.Inject;
+
+import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
+import org.reactivestreams.Publisher;
+
+import com.rabbitmq.client.Connection;
+
+import reactor.core.publisher.Mono;
+
+public class DeletedMessageVaultWorkQueueReconnectionHandler implements 
SimpleConnectionPool.ReconnectionHandler {
+    private final DistributedDeletedMessageVaultDeletionCallback 
distributedDeletedMessageVaultDeletionCallback;
+
+    @Inject
+    public 
DeletedMessageVaultWorkQueueReconnectionHandler(DistributedDeletedMessageVaultDeletionCallback
 distributedDeletedMessageVaultDeletionCallback) {
+        this.distributedDeletedMessageVaultDeletionCallback = 
distributedDeletedMessageVaultDeletionCallback;
+    }
+
+    @Override
+    public Publisher<Void> handleReconnection(Connection connection) {
+        return 
Mono.fromRunnable(distributedDeletedMessageVaultDeletionCallback::restart);
+    }
+}
diff --git 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
index 6be7246339..6bed7178c5 100644
--- 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
+++ 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
@@ -35,6 +35,7 @@ import jakarta.inject.Inject;
 
 import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
 import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
 import org.apache.james.blob.api.BlobId;
 import org.apache.james.core.Username;
 import org.apache.james.lifecycle.api.Startable;
@@ -168,7 +169,7 @@ public class DistributedDeletedMessageVaultDeletionCallback 
implements DeleteMes
     private final MailboxId.Factory mailboxIdFactory;
     private final MessageId.Factory messageIdFactory;
     private final BlobId.Factory blobIdFactory;
-    private Receiver receiver;
+    private final ReceiverProvider receiverProvider;
     private Disposable disposable;
 
     @Inject
@@ -178,7 +179,8 @@ public class DistributedDeletedMessageVaultDeletionCallback 
implements DeleteMes
                                                           
DeletedMessageVaultDeletionCallback callback,
                                                           MailboxId.Factory 
mailboxIdFactory,
                                                           MessageId.Factory 
messageIdFactory,
-                                                          BlobId.Factory 
blobIdFactory) {
+                                                          BlobId.Factory 
blobIdFactory,
+                                                          ReceiverProvider 
receiverProvider) {
         this.sender = sender;
         this.rabbitMQConfiguration = rabbitMQConfiguration;
         this.callback = callback;
@@ -187,6 +189,7 @@ public class DistributedDeletedMessageVaultDeletionCallback 
implements DeleteMes
         this.blobIdFactory = blobIdFactory;
         this.objectMapper = new ObjectMapper();
         this.channelPool = channelPool;
+        this.receiverProvider = receiverProvider;
     }
 
     public void init() {
@@ -215,17 +218,28 @@ public class 
DistributedDeletedMessageVaultDeletionCallback implements DeleteMes
             .then()
             .block();
 
-        receiver = channelPool.createReceiver();
-        disposable = receiver.consumeManualAck(QUEUE, new 
ConsumeOptions().qos(QOS))
+        disposable = consumeDeletedMessageVaultWorkQueue();
+    }
+
+    private Disposable consumeDeletedMessageVaultWorkQueue() {
+        return Flux.using(
+                receiverProvider::createReceiver,
+                receiver -> receiver.consumeManualAck(QUEUE, new 
ConsumeOptions().qos(QOS)),
+                Receiver::close)
             .flatMap(this::handleMessage)
             .subscribeOn(Schedulers.boundedElastic())
             .subscribe();
     }
 
+    public void restart() {
+        Disposable previousConsumer = disposable;
+        disposable = consumeDeletedMessageVaultWorkQueue();
+        previousConsumer.dispose();
+    }
+
     @PreDestroy
     public void stop() {
         Optional.ofNullable(disposable).ifPresent(Disposable::dispose);
-        Optional.ofNullable(receiver).ifPresent(Receiver::close);
     }
 
     private Mono<Void> handleMessage(AcknowledgableDelivery delivery) {
diff --git 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
index 68c1744ea9..2a2078ed49 100644
--- 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
+++ 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
@@ -20,6 +20,7 @@
 package org.apache.james.modules.mailbox;
 
 import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
 import org.apache.james.mailbox.cassandra.DeleteMessageListener;
 import org.apache.james.modules.vault.DeletedMessageVaultModule;
 import org.apache.james.utils.InitializationOperation;
@@ -68,6 +69,9 @@ public class DistributedDeletedMessageVaultModule extends 
AbstractModule {
             .addBinding()
             .to(DistributedDeletedMessageVaultDeletionCallback.class);
         
bind(DistributedDeletedMessageVaultDeletionCallback.class).in(Scopes.SINGLETON);
+
+        Multibinder<SimpleConnectionPool.ReconnectionHandler> 
reconnectionHandlerMultibinder = Multibinder.newSetBinder(binder(), 
SimpleConnectionPool.ReconnectionHandler.class);
+        
reconnectionHandlerMultibinder.addBinding().to(DeletedMessageVaultWorkQueueReconnectionHandler.class);
     }
 
     @ProvidesIntoSet


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to