Tran Hong Quan created JAMES-4154:
-------------------------------------
Summary: Generalize RabbitMQ work queue code used for async
deletion callback(s)
Key: JAMES-4154
URL: https://issues.apache.org/jira/browse/JAMES-4154
Project: James Server
Issue Type: Improvement
Components: deletedMessageVault, Queue
Affects Versions: master
Reporter: Tran Hong Quan
h2. Why
Today, we have the deleted message vault that handles deleted message callback
asynchronously, relying on the RabbitMQ work queue to avoid timeout consuming
e.g. DTM for deletion of a mailbox having 1 million emails.
We may want to have other async deletion callback(s) that could rely on RabbiMQ
work queue too.
We should find a way to mutualize the shared RabbitMQ work queue setup, and
plug different callback(s) easily.
h2. How
- Introduce `AsyncDeletionCallback` interface
```java
public interface AsyncDeletionCallback extends
DeleteMessageListener.DeletionCallback {
}
```
- Implement `AggregatedAsyncDeletionCallback` that uses the RabbitMQ work queue
code similar to current`DistributedDeletedMessageVaultDeletionCallback`, but
takes an `AsyncDeletionCallback` set as an argument.
The idea is to share the same RabbitMQ work queue e.g.
`async-deletion-work-queue`, to handle all the async deletion callback(s).
```java
private Mono<Void> handleMessage(AcknowledgableDelivery delivery) {
try {
CopyCommandDTO copyCommandDTO =
objectMapper.readValue(delivery.getBody(), CopyCommandDTO.class);
return Flux.fromIterable(asyncDeletionCallbacks)
.flatMap(callback ->
callback.forMessage(copyCommandDTO.asPojo(mailboxIdFactory, messageIdFactory,
blobIdFactory)))
.then()
.timeout(Duration.ofMinutes(5))
.doOnSuccess(any -> delivery.ack())
.doOnCancel(() -> delivery.nack(REQUEUE))
.onErrorResume(e -> {
LOGGER.error("Failed executing async deletion callbacks for
{}", copyCommandDTO.messageId, e);
delivery.nack(REQUEUE);
return Mono.empty();
});
```
- Refactor `DistributedDeletedMessageVaultDeletionCallback`: strip all the
rabbitmq code, just implement `AsyncDeletionCallback`
- Refactor `DeletedMessageVaultWorkQueueReconnectionHandler` so it handles
reconnection for `AggregatedAsyncDeletionCallback` instead.
- Guice binding to plug DTM as an `AsyncDeletionCallback`
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]