This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch dtm-workqueue
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7082160d7a6f95fc143e32efed21539eac3f4a80
Author: Benoit TELLIER <btell...@linagora.com>
AuthorDate: Mon Mar 18 22:07:05 2024 +0100

    [ENHANCEMENT] Workqueue for the deleted message vault
    
    Deleted message vault copy can be long, doing it
    asynchronously would make the feature more reliable.
---
 .../mailbox/cassandra/DeleteMessageListener.java   |  67 +++++-
 .../DeletedMessageVaultDeletionCallback.java       |  24 +-
 .../james/CassandraRabbitMQJamesServerMain.java    |   4 +-
 .../james/DistributedPOP3JamesServerMain.java      |   4 +-
 ...ributedDeletedMessageVaultDeletionCallback.java | 262 +++++++++++++++++++++
 .../DistributedDeletedMessageVaultModule.java      |  69 ++++++
 6 files changed, 412 insertions(+), 18 deletions(-)

diff --git 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
index 504b07c2f5..0b091ba330 100644
--- 
a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
+++ 
b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
@@ -23,6 +23,7 @@ import static 
org.apache.james.backends.cassandra.init.configuration.JamesExecut
 import static 
org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles.ConsistencyChoice.WEAK;
 import static org.apache.james.util.FunctionalUtils.negate;
 
+import java.util.Date;
 import java.util.Optional;
 import java.util.Set;
 
@@ -30,6 +31,7 @@ import javax.inject.Inject;
 
 import 
org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import 
org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles;
+import org.apache.james.blob.api.BlobId;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.core.Username;
 import org.apache.james.events.Event;
@@ -59,6 +61,7 @@ import 
org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 import org.apache.james.mailbox.model.MailboxACL;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mailbox.model.MessageMetaData;
 import org.apache.james.mailbox.model.MessageRange;
 import org.apache.james.mailbox.store.mail.MessageMapper;
@@ -86,9 +89,71 @@ public class DeleteMessageListener implements 
EventListener.ReactiveGroupEventLi
 
     }
 
+    public static class DelectedMessageCopyCommand {
+        public static DelectedMessageCopyCommand of(MessageRepresentation 
message, MailboxId mailboxId, Username owner) {
+            return new DelectedMessageCopyCommand(message.getMessageId(), 
mailboxId, owner, message.getInternalDate(),
+                message.getSize(), !message.getAttachments().isEmpty(), 
message.getHeaderId(), message.getBodyId());
+        }
+
+        private final MessageId messageId;
+        private final MailboxId mailboxId;
+        private final Username owner;
+        private final Date internalDate;
+        private final long size;
+        private final boolean hasAttachments;
+        private final BlobId headerId;
+        private final BlobId bodyId;
+
+        public DelectedMessageCopyCommand(MessageId messageId, MailboxId 
mailboxId, Username owner, Date internalDate, long size, boolean 
hasAttachments, BlobId headerId, BlobId bodyId) {
+            this.messageId = messageId;
+            this.mailboxId = mailboxId;
+            this.owner = owner;
+            this.internalDate = internalDate;
+            this.size = size;
+            this.hasAttachments = hasAttachments;
+            this.headerId = headerId;
+            this.bodyId = bodyId;
+        }
+
+        public Username getOwner() {
+            return owner;
+        }
+
+        public MessageId getMessageId() {
+            return messageId;
+        }
+
+        public MailboxId getMailboxId() {
+            return mailboxId;
+        }
+
+        public Date getInternalDate() {
+            return internalDate;
+        }
+
+        public long getSize() {
+            return size;
+        }
+
+        public boolean hasAttachments() {
+            return hasAttachments;
+        }
+
+        public BlobId getHeaderId() {
+            return headerId;
+        }
+
+        public BlobId getBodyId() {
+            return bodyId;
+        }
+    }
+
     @FunctionalInterface
     public interface DeletionCallback {
-        Mono<Void> forMessage(MessageRepresentation message, MailboxId 
mailboxId, Username owner);
+        default Mono<Void> forMessage(MessageRepresentation message, MailboxId 
mailboxId, Username owner) {
+            return forMessage(DelectedMessageCopyCommand.of(message, 
mailboxId, owner));
+        }
+        Mono<Void> forMessage(DelectedMessageCopyCommand copyCommand);
     }
 
     private final CassandraThreadDAO threadDAO;
diff --git 
a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java
 
b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java
index b23a895f20..611fa69779 100644
--- 
a/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java
+++ 
b/mailbox/plugin/deleted-messages-vault-cassandra/src/main/java/org/apache/james/vault/metadata/DeletedMessageVaultDeletionCallback.java
@@ -34,10 +34,7 @@ import javax.inject.Inject;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.core.MailAddress;
 import org.apache.james.core.MaybeSender;
-import org.apache.james.core.Username;
 import org.apache.james.mailbox.cassandra.DeleteMessageListener;
-import org.apache.james.mailbox.cassandra.mail.MessageRepresentation;
-import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MessageId;
 import org.apache.james.mime4j.MimeIOException;
 import org.apache.james.mime4j.codec.DecodeMonitor;
@@ -71,29 +68,30 @@ public class DeletedMessageVaultDeletionCallback implements 
DeleteMessageListene
     }
 
     @Override
-    public Mono<Void> forMessage(MessageRepresentation message, MailboxId 
mailboxId, Username owner) {
-        return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), 
message.getHeaderId(), BlobStore.StoragePolicy.LOW_COST))
+    public Mono<Void> 
forMessage(DeleteMessageListener.DelectedMessageCopyCommand copyCommand) {
+        return Mono.from(blobStore.readBytes(blobStore.getDefaultBucketName(), 
copyCommand.getHeaderId(), BlobStore.StoragePolicy.LOW_COST))
             .flatMap(bytes -> {
-                Optional<Message> mimeMessage = parseMessage(new 
ByteArrayInputStream(bytes), message.getMessageId());
+                Optional<Message> mimeMessage = parseMessage(new 
ByteArrayInputStream(bytes), copyCommand.getMessageId());
                 DeletedMessage deletedMessage = DeletedMessage.builder()
-                    .messageId(message.getMessageId())
-                    .originMailboxes(mailboxId)
-                    .user(owner)
-                    
.deliveryDate(ZonedDateTime.ofInstant(message.getInternalDate().toInstant(), 
ZoneOffset.UTC))
+                    .messageId(copyCommand.getMessageId())
+                    .originMailboxes(copyCommand.getMailboxId())
+                    .user(copyCommand.getOwner())
+                    
.deliveryDate(ZonedDateTime.ofInstant(copyCommand.getInternalDate().toInstant(),
 ZoneOffset.UTC))
                     .deletionDate(ZonedDateTime.ofInstant(clock.instant(), 
ZoneOffset.UTC))
                     .sender(retrieveSender(mimeMessage))
                     .recipients(retrieveRecipients(mimeMessage))
-                    .hasAttachment(!message.getAttachments().isEmpty())
-                    .size(message.getSize())
+                    .hasAttachment(copyCommand.hasAttachments())
+                    .size(copyCommand.getSize())
                     .subject(mimeMessage.map(Message::getSubject))
                     .build();
 
-                return 
Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), 
message.getBodyId(), BlobStore.StoragePolicy.LOW_COST))
+                return 
Mono.from(blobStore.readReactive(blobStore.getDefaultBucketName(), 
copyCommand.getBodyId(), BlobStore.StoragePolicy.LOW_COST))
                     .map(bodyStream -> new SequenceInputStream(new 
ByteArrayInputStream(bytes), bodyStream))
                     .flatMap(bodyStream -> 
Mono.from(deletedMessageVault.append(deletedMessage, bodyStream)));
             });
     }
 
+
     private Optional<Message> parseMessage(InputStream inputStream, MessageId 
messageId) {
         DefaultMessageBuilder messageBuilder = new DefaultMessageBuilder();
         messageBuilder.setMimeEntityConfig(MimeConfig.PERMISSIVE);
diff --git 
a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
 
b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
index d5b27726c9..794d210337 100644
--- 
a/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
+++ 
b/server/apps/distributed-app/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
@@ -51,12 +51,12 @@ import 
org.apache.james.modules.data.CassandraVacationModule;
 import org.apache.james.modules.event.JMAPEventBusModule;
 import org.apache.james.modules.event.RabbitMQEventBusModule;
 import org.apache.james.modules.eventstore.CassandraEventStoreModule;
-import org.apache.james.modules.mailbox.CassandraDeletedMessageVaultModule;
 import org.apache.james.modules.mailbox.CassandraMailboxModule;
 import org.apache.james.modules.mailbox.CassandraMailboxQuotaLegacyModule;
 import org.apache.james.modules.mailbox.CassandraMailboxQuotaModule;
 import org.apache.james.modules.mailbox.CassandraQuotaMailingModule;
 import org.apache.james.modules.mailbox.CassandraSessionModule;
+import org.apache.james.modules.mailbox.DistributedDeletedMessageVaultModule;
 import org.apache.james.modules.mailbox.TikaMailboxModule;
 import org.apache.james.modules.mailrepository.CassandraMailRepositoryModule;
 import org.apache.james.modules.metrics.CassandraMetricsModule;
@@ -228,7 +228,7 @@ public class CassandraRabbitMQJamesServerMain implements 
JamesServerMain {
     private static Module chooseDeletedMessageVault(VaultConfiguration 
vaultConfiguration) {
         if (vaultConfiguration.isEnabled()) {
             return Modules.combine(
-                new CassandraDeletedMessageVaultModule(),
+                new DistributedDeletedMessageVaultModule(),
                 new DeletedMessageVaultRoutesModule());
         }
         return binder -> {
diff --git 
a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
 
b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
index d094aaab94..d42cc35d11 100644
--- 
a/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
+++ 
b/server/apps/distributed-pop3-app/src/main/java/org/apache/james/DistributedPOP3JamesServerMain.java
@@ -59,11 +59,11 @@ import org.apache.james.modules.event.JMAPEventBusModule;
 import org.apache.james.modules.event.RabbitMQEventBusModule;
 import org.apache.james.modules.eventstore.CassandraEventStoreModule;
 import org.apache.james.modules.mailbox.CassandraBlobStoreDependenciesModule;
-import org.apache.james.modules.mailbox.CassandraDeletedMessageVaultModule;
 import org.apache.james.modules.mailbox.CassandraMailboxModule;
 import org.apache.james.modules.mailbox.CassandraMailboxQuotaLegacyModule;
 import org.apache.james.modules.mailbox.CassandraMailboxQuotaModule;
 import org.apache.james.modules.mailbox.CassandraSessionModule;
+import org.apache.james.modules.mailbox.DistributedDeletedMessageVaultModule;
 import org.apache.james.modules.mailbox.TikaMailboxModule;
 import org.apache.james.modules.mailrepository.CassandraMailRepositoryModule;
 import org.apache.james.modules.metrics.CassandraMetricsModule;
@@ -218,7 +218,7 @@ public class DistributedPOP3JamesServerMain implements 
JamesServerMain {
     private static Module chooseDeletedMessageVault(VaultConfiguration 
vaultConfiguration) {
         if (vaultConfiguration.isEnabled()) {
             return Modules.combine(
-                new CassandraDeletedMessageVaultModule(),
+                new DistributedDeletedMessageVaultModule(),
                 new DeletedMessageVaultRoutesModule());
         }
         return binder -> {
diff --git 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
new file mode 100644
index 0000000000..282f9007b4
--- /dev/null
+++ 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
@@ -0,0 +1,262 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.modules.mailbox;
+
+import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
+import static org.apache.james.backends.rabbitmq.Constants.ALLOW_QUORUM;
+import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
+import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
+import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
+
+import java.util.Date;
+import java.util.Optional;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.blob.api.BlobId;
+import org.apache.james.core.Username;
+import org.apache.james.mailbox.cassandra.DeleteMessageListener;
+import org.apache.james.mailbox.model.MailboxId;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.vault.metadata.DeletedMessageVaultDeletionCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rabbitmq.client.AMQP;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.rabbitmq.AcknowledgableDelivery;
+import reactor.rabbitmq.BindingSpecification;
+import reactor.rabbitmq.ConsumeOptions;
+import reactor.rabbitmq.ExchangeSpecification;
+import reactor.rabbitmq.OutboundMessage;
+import reactor.rabbitmq.QueueSpecification;
+import reactor.rabbitmq.Receiver;
+import reactor.rabbitmq.Sender;
+
+public class DistributedDeletedMessageVaultDeletionCallback implements 
DeleteMessageListener.DeletionCallback {
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(DistributedDeletedMessageVaultDeletionCallback.class);
+
+    private static class CopyCommandDTO {
+        public static CopyCommandDTO 
of(DeleteMessageListener.DelectedMessageCopyCommand command) {
+            return new CopyCommandDTO(
+                command.getMessageId().serialize(),
+                command.getMailboxId().serialize(),
+                command.getOwner().asString(),
+                command.getInternalDate(),
+                command.getSize(),
+                command.hasAttachments(),
+                command.getHeaderId().asString(),
+                command.getBodyId().asString());
+        }
+
+        private final String messageId;
+        private final String mailboxId;
+        private final String owner;
+        private final Date internalDate;
+        private final long size;
+        private final boolean hasAttachments;
+        private final String headerId;
+        private final String bodyId;
+
+        @JsonCreator
+        public CopyCommandDTO(@JsonProperty("messageId") String messageId,
+                              @JsonProperty("mailboxId") String mailboxId,
+                              @JsonProperty("owner") String owner,
+                              @JsonProperty("internalDate") Date internalDate,
+                              @JsonProperty("size") long size,
+                              @JsonProperty("hasAttachments") boolean 
hasAttachments,
+                              @JsonProperty("headerId") String headerId,
+                              @JsonProperty("bodyId") String bodyId) {
+            this.messageId = messageId;
+            this.mailboxId = mailboxId;
+            this.owner = owner;
+            this.internalDate = internalDate;
+            this.size = size;
+            this.hasAttachments = hasAttachments;
+            this.headerId = headerId;
+            this.bodyId = bodyId;
+        }
+
+        public String getMessageId() {
+            return messageId;
+        }
+
+        public String getMailboxId() {
+            return mailboxId;
+        }
+
+        public String getOwner() {
+            return owner;
+        }
+
+        public Date getInternalDate() {
+            return internalDate;
+        }
+
+        public long getSize() {
+            return size;
+        }
+
+        public boolean isHasAttachments() {
+            return hasAttachments;
+        }
+
+        public String getHeaderId() {
+            return headerId;
+        }
+
+        public String getBodyId() {
+            return bodyId;
+        }
+
+        @JsonIgnore
+        DeleteMessageListener.DelectedMessageCopyCommand 
asPojo(MailboxId.Factory mailboxIdFactory, MessageId.Factory messageIdFactory, 
BlobId.Factory blobIdFactory) {
+            return new 
DeleteMessageListener.DelectedMessageCopyCommand(messageIdFactory.fromString(messageId),
+                mailboxIdFactory.fromString(messageId),
+                Username.of(owner),
+                internalDate,
+                size,
+                hasAttachments,
+                blobIdFactory.from(headerId),
+                blobIdFactory.from(bodyId));
+        }
+    }
+
+    private static final String EXCHANGE = "deleted-message-vault";
+    private static final String QUEUE = "deleted-message-vault-work-queue";
+    private static final String DEAD_LETTER = QUEUE + "-dead-letter";
+    private static final boolean REQUEUE = true;
+    private static final int QOS = 5;
+
+    private final ReactorRabbitMQChannelPool channelPool;
+    private final RabbitMQConfiguration rabbitMQConfiguration;
+    private final DeletedMessageVaultDeletionCallback callback;
+    private final Sender sender;
+    private final ObjectMapper objectMapper;
+    private final MailboxId.Factory mailboxIdFactory;
+    private final MessageId.Factory messageIdFactory;
+    private final BlobId.Factory blobIdFactory;
+    private Receiver receiver;
+    private Disposable disposable;
+
+    @Inject
+    public DistributedDeletedMessageVaultDeletionCallback(Sender sender,
+                                                          
ReactorRabbitMQChannelPool channelPool,
+                                                          
RabbitMQConfiguration rabbitMQConfiguration,
+                                                          
DeletedMessageVaultDeletionCallback callback,
+                                                          MailboxId.Factory 
mailboxIdFactory,
+                                                          MessageId.Factory 
messageIdFactory,
+                                                          BlobId.Factory 
blobIdFactory) {
+        this.sender = sender;
+        this.rabbitMQConfiguration = rabbitMQConfiguration;
+        this.callback = callback;
+        this.mailboxIdFactory = mailboxIdFactory;
+        this.messageIdFactory = messageIdFactory;
+        this.blobIdFactory = blobIdFactory;
+        this.objectMapper = new ObjectMapper();
+        this.channelPool = channelPool;
+    }
+
+    public void init() {
+        Flux.concat(
+                sender.declareExchange(ExchangeSpecification.exchange(EXCHANGE)
+                    .durable(DURABLE)
+                    .type(DIRECT_EXCHANGE)),
+                sender.declareQueue(QueueSpecification.queue(DEAD_LETTER)
+                    .durable(DURABLE)
+                    .exclusive(!EXCLUSIVE)
+                    .autoDelete(!AUTO_DELETE)
+                    
.arguments(rabbitMQConfiguration.workQueueArgumentsBuilder(!ALLOW_QUORUM)
+                        .deadLetter(DEAD_LETTER)
+                        .build())),
+                sender.declareQueue(QueueSpecification.queue(QUEUE)
+                    .durable(DURABLE)
+                    .exclusive(!EXCLUSIVE)
+                    .autoDelete(!AUTO_DELETE)
+                    
.arguments(rabbitMQConfiguration.workQueueArgumentsBuilder(!ALLOW_QUORUM).build())),
+                sender.bind(BindingSpecification.binding()
+                    .exchange(EXCHANGE)
+                    .queue(QUEUE)
+                    .routingKey(EMPTY_ROUTING_KEY)))
+            .then()
+            .block();
+
+        receiver = channelPool.createReceiver();
+        disposable = receiver.consumeManualAck(QUEUE, new 
ConsumeOptions().qos(QOS))
+            .map(delivery -> Mono.fromCallable(delivery::getBody)
+                .flatMap(bytes -> handleMessage(delivery, bytes)))
+            .subscribeOn(Schedulers.boundedElastic())
+            .subscribe();
+    }
+
+    public void stop() {
+        Optional.ofNullable(disposable).ifPresent(Disposable::dispose);
+        Optional.ofNullable(receiver).ifPresent(Receiver::close);
+    }
+
+    private Mono<Void> handleMessage(AcknowledgableDelivery delivery, byte[] 
bytes) {
+        try {
+            CopyCommandDTO copyCommandDTO = objectMapper.readValue(bytes, 
CopyCommandDTO.class);
+
+            return callback.forMessage(copyCommandDTO.asPojo(mailboxIdFactory, 
messageIdFactory, blobIdFactory))
+                .doOnError(e -> {
+                    LOGGER.error("Failed executing deletion callback for {}", 
copyCommandDTO.messageId, e);
+                    delivery.nack(REQUEUE);
+                })
+                .doOnSuccess(any -> delivery.ack())
+                .doOnCancel(() -> delivery.nack(REQUEUE));
+        } catch (Exception e) {
+            LOGGER.error(" Deserialization error: reject poisonous message for 
distributed Deleted message vault callback", e);
+            // Deserialization error: reject poisonous messages
+            delivery.nack(!REQUEUE);
+            return Mono.empty();
+        }
+    }
+
+    @Override
+    public Mono<Void> 
forMessage(DeleteMessageListener.DelectedMessageCopyCommand command) {
+        CopyCommandDTO dto = CopyCommandDTO.of(command);
+        try {
+            byte[] bytes = objectMapper.writeValueAsBytes(dto);
+            return sender.send(Mono.just(new OutboundMessage(EXCHANGE, 
EMPTY_ROUTING_KEY, new AMQP.BasicProperties.Builder()
+                .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
+                .priority(PERSISTENT_TEXT_PLAIN.getPriority())
+                .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
+                .build(), bytes)));
+        } catch (JsonProcessingException e) {
+            return Mono.error(e);
+        }
+    }
+
+
+}
diff --git 
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
new file mode 100644
index 0000000000..25bd0fd7cd
--- /dev/null
+++ 
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultModule.java
@@ -0,0 +1,69 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.modules.mailbox;
+
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.mailbox.cassandra.DeleteMessageListener;
+import org.apache.james.modules.vault.DeletedMessageVaultModule;
+import org.apache.james.vault.DeletedMessageVault;
+import org.apache.james.vault.blob.BlobStoreDeletedMessageVault;
+import org.apache.james.vault.blob.BucketNameGenerator;
+import 
org.apache.james.vault.dto.DeletedMessageWithStorageInformationConverter;
+import org.apache.james.vault.metadata.CassandraDeletedMessageMetadataVault;
+import org.apache.james.vault.metadata.DeletedMessageMetadataModule;
+import org.apache.james.vault.metadata.DeletedMessageMetadataVault;
+import org.apache.james.vault.metadata.DeletedMessageVaultDeletionCallback;
+import org.apache.james.vault.metadata.MetadataDAO;
+import org.apache.james.vault.metadata.StorageInformationDAO;
+import org.apache.james.vault.metadata.UserPerBucketDAO;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Scopes;
+import com.google.inject.multibindings.Multibinder;
+
+public class DistributedDeletedMessageVaultModule extends AbstractModule {
+    @Override
+    protected void configure() {
+        install(new DeletedMessageVaultModule());
+
+        Multibinder<CassandraModule> cassandraDataDefinitions = 
Multibinder.newSetBinder(binder(), CassandraModule.class);
+        cassandraDataDefinitions
+            .addBinding()
+            .toInstance(DeletedMessageMetadataModule.MODULE);
+
+        bind(MetadataDAO.class).in(Scopes.SINGLETON);
+        bind(StorageInformationDAO.class).in(Scopes.SINGLETON);
+        bind(UserPerBucketDAO.class).in(Scopes.SINGLETON);
+        
bind(DeletedMessageWithStorageInformationConverter.class).in(Scopes.SINGLETON);
+
+        bind(CassandraDeletedMessageMetadataVault.class).in(Scopes.SINGLETON);
+        bind(DeletedMessageMetadataVault.class)
+            .to(CassandraDeletedMessageMetadataVault.class);
+
+        bind(BucketNameGenerator.class).in(Scopes.SINGLETON);
+        bind(BlobStoreDeletedMessageVault.class).in(Scopes.SINGLETON);
+        bind(DeletedMessageVault.class)
+            .to(BlobStoreDeletedMessageVault.class);
+
+        Multibinder.newSetBinder(binder(), 
DeleteMessageListener.DeletionCallback.class)
+            .addBinding()
+            .to(DistributedDeletedMessageVaultDeletionCallback.class);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@james.apache.org
For additional commands, e-mail: notifications-h...@james.apache.org

Reply via email to