This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 4da699c622 [FIX] Better error management for DeletedMessageVault
consumer (#2651)
4da699c622 is described below
commit 4da699c62288c8fc189ac1d4fd9681719dfdb246
Author: Trần Hồng Quân <[email protected]>
AuthorDate: Mon Mar 3 08:03:01 2025 +0700
[FIX] Better error management for DeletedMessageVault consumer (#2651)
Otherwise, blobstore errors for example could crash the DTM consumer.
Co-authored-by: Benoit TELLIER <[email protected]>
---
.../mailbox/DistributedDeletedMessageVaultDeletionCallback.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
index 6bed7178c5..d848c037c1 100644
---
a/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
+++
b/server/container/guice/distributed/src/main/java/org/apache/james/modules/mailbox/DistributedDeletedMessageVaultDeletionCallback.java
@@ -226,7 +226,7 @@ public class DistributedDeletedMessageVaultDeletionCallback
implements DeleteMes
receiverProvider::createReceiver,
receiver -> receiver.consumeManualAck(QUEUE, new
ConsumeOptions().qos(QOS)),
Receiver::close)
- .flatMap(this::handleMessage)
+ .flatMap(this::handleMessage, QOS)
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
@@ -248,9 +248,10 @@ public class
DistributedDeletedMessageVaultDeletionCallback implements DeleteMes
return callback.forMessage(copyCommandDTO.asPojo(mailboxIdFactory,
messageIdFactory, blobIdFactory))
.timeout(Duration.ofMinutes(5))
- .doOnError(e -> {
+ .onErrorResume(e -> {
LOGGER.error("Failed executing deletion callback for {}",
copyCommandDTO.messageId, e);
delivery.nack(REQUEUE);
+ return Mono.empty();
})
.doOnSuccess(any -> delivery.ack())
.doOnCancel(() -> delivery.nack(REQUEUE));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]