[
https://issues.apache.org/jira/browse/JAMES-4154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18042717#comment-18042717
]
Benoit Tellier commented on JAMES-4154:
---------------------------------------
Or we could just play around the Event bus concept, have a "Content deletion
event" that 2 listeners dedicated to LLM deletes AND the vault could consume.
Empty registration key would ensure we never publish to redis.
1 bird 3 stone:
- pg support for free
- multiple listener for free
- we even drop a lot of PG code
Maybe just reuse event bus is the best way forward? Likely with an event bus
dedicated to the use case?
> Generalize RabbitMQ work queue code used for async deletion callback(s)
> -----------------------------------------------------------------------
>
> Key: JAMES-4154
> URL: https://issues.apache.org/jira/browse/JAMES-4154
> Project: James Server
> Issue Type: Improvement
> Components: deletedMessageVault, Queue
> Affects Versions: master
> Reporter: Tran Hong Quan
> Priority: Minor
>
> h2. Why
> Today, we have the deleted message vault that handles deleted message
> callback asynchronously, relying on the RabbitMQ work queue to avoid timeout
> consuming e.g. DTM for deletion of a mailbox having 1 million emails.
> We may want to have other async deletion callback(s) that could rely on
> RabbiMQ work queue too.
> We should find a way to mutualize the shared RabbitMQ work queue setup, and
> plug different callback(s) easily.
> h2. How
> - Introduce `AsyncDeletionCallback` interface
> ```java
> public interface AsyncDeletionCallback extends
> DeleteMessageListener.DeletionCallback {
> }
> ```
> - Implement `AggregatedAsyncDeletionCallback` that uses the RabbitMQ work
> queue code similar to
> current`DistributedDeletedMessageVaultDeletionCallback`, but takes an
> `AsyncDeletionCallback` set as an argument.
> The idea is to share the same RabbitMQ work queue e.g.
> `async-deletion-work-queue`, to handle all the async deletion callback(s).
> ```java
> private Mono<Void> handleMessage(AcknowledgableDelivery delivery) {
> try {
> CopyCommandDTO copyCommandDTO =
> objectMapper.readValue(delivery.getBody(), CopyCommandDTO.class);
> return Flux.fromIterable(asyncDeletionCallbacks)
> .flatMap(callback ->
> callback.forMessage(copyCommandDTO.asPojo(mailboxIdFactory, messageIdFactory,
> blobIdFactory)))
> .then()
> .timeout(Duration.ofMinutes(5))
> .doOnSuccess(any -> delivery.ack())
> .doOnCancel(() -> delivery.nack(REQUEUE))
> .onErrorResume(e -> {
> LOGGER.error("Failed executing async deletion callbacks
> for {}", copyCommandDTO.messageId, e);
> delivery.nack(REQUEUE);
> return Mono.empty();
> });
> ```
> - Refactor `DistributedDeletedMessageVaultDeletionCallback`: strip all the
> rabbitmq code, just implement `AsyncDeletionCallback`
> - Refactor `DeletedMessageVaultWorkQueueReconnectionHandler` so it handles
> reconnection for `AggregatedAsyncDeletionCallback` instead.
> - Guice binding to plug DTM as an `AsyncDeletionCallback`
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]