Tran Hong Quan created JAMES-4154:
-------------------------------------

             Summary: Generalize RabbitMQ work queue code used for async 
deletion callback(s)
                 Key: JAMES-4154
                 URL: https://issues.apache.org/jira/browse/JAMES-4154
             Project: James Server
          Issue Type: Improvement
          Components: deletedMessageVault, Queue
    Affects Versions: master
            Reporter: Tran Hong Quan


h2. Why

Today, we have the deleted message vault that handles deleted message callback 
asynchronously, relying on the RabbitMQ work queue to avoid timeout consuming 
e.g. DTM for deletion of a mailbox having 1 million emails.

We may want to have other async deletion callback(s) that could rely on RabbiMQ 
work queue too.

We should find a way to mutualize the shared RabbitMQ work queue setup, and 
plug different callback(s) easily.

h2. How
- Introduce `AsyncDeletionCallback` interface
```java
public interface AsyncDeletionCallback extends 
DeleteMessageListener.DeletionCallback {

}
```

- Implement `AggregatedAsyncDeletionCallback` that uses the RabbitMQ work queue 
code similar to current`DistributedDeletedMessageVaultDeletionCallback`, but 
takes an `AsyncDeletionCallback` set as an argument.

   The idea is to share the same RabbitMQ work queue e.g. 
`async-deletion-work-queue`, to handle all the async deletion callback(s).

```java
    private Mono<Void> handleMessage(AcknowledgableDelivery delivery) {
        try {
            CopyCommandDTO copyCommandDTO = 
objectMapper.readValue(delivery.getBody(), CopyCommandDTO.class);

            return Flux.fromIterable(asyncDeletionCallbacks)
                .flatMap(callback -> 
callback.forMessage(copyCommandDTO.asPojo(mailboxIdFactory, messageIdFactory, 
blobIdFactory)))
                .then()
                .timeout(Duration.ofMinutes(5))
                .doOnSuccess(any -> delivery.ack())
                .doOnCancel(() -> delivery.nack(REQUEUE))
                .onErrorResume(e -> {
                    LOGGER.error("Failed executing async deletion callbacks for 
{}", copyCommandDTO.messageId, e);
                    delivery.nack(REQUEUE);
                    return Mono.empty();
                });
```

- Refactor `DistributedDeletedMessageVaultDeletionCallback`: strip all the 
rabbitmq code, just implement `AsyncDeletionCallback`

- Refactor `DeletedMessageVaultWorkQueueReconnectionHandler` so it handles 
reconnection for `AggregatedAsyncDeletionCallback` instead.

- Guice binding to plug DTM as an `AsyncDeletionCallback`




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to