This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7715e6b25b364772c90534bd4fae7b9e9ef7cb61 Author: Quan Tran <[email protected]> AuthorDate: Thu Mar 19 10:50:33 2026 +0700 JAMES-4191 DeleteMessageListener: relax dispatching MessageContentDeletionEvent Emit always upon message deletion. No longer emit only when global unreferenced. --- .../mailbox/cassandra/DeleteMessageListener.java | 63 +++++++++++----------- .../mailbox/postgres/DeleteMessageListener.java | 31 ++++++----- 2 files changed, 45 insertions(+), 49 deletions(-) diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java index b4a80b22c9..803677de5c 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java @@ -160,27 +160,26 @@ public class DeleteMessageListener implements EventListener.ReactiveGroupEventLi CassandraId mailboxId = (CassandraId) mailboxDeletion.getMailboxId(); - return handleMailboxDeletion(mailboxId, mailboxDeletion.getMailboxPath()); + return handleMailboxDeletion(mailboxId, mailboxDeletion.getMailboxPath(), mailboxDeletion.getMailboxACL()); } return Mono.empty(); } - private Mono<Void> handleMailboxDeletion(CassandraId mailboxId, MailboxPath path) { + private Mono<Void> handleMailboxDeletion(CassandraId mailboxId, MailboxPath path, MailboxACL mailboxACL) { int prefetch = 1; - return getMailboxACL(mailboxId) - .flatMap(mailboxACL -> Flux.mergeDelayError(prefetch, - messageIdDAO.retrieveMessages(mailboxId, MessageRange.all(), Limit.unlimited()) - .concatMap(metadata -> handleMessageDeletionAsPartOfMailboxDeletion((CassandraMessageId) metadata.getComposedMessageId().getComposedMessageId().getMessageId(), - metadata.getComposedMessageId().getThreadId(), metadata.getComposedMessageId().getFlags(), mailboxId, path.getUser(), path, mailboxACL) - .then(imapUidDAO.delete((CassandraMessageId) metadata.getComposedMessageId().getComposedMessageId().getMessageId(), mailboxId)) - .then(messageIdDAO.delete(mailboxId, metadata.getComposedMessageId().getComposedMessageId().getUid()))), - deleteAcl(mailboxId), - applicableFlagDAO.delete(mailboxId), - firstUnseenDAO.removeAll(mailboxId), - deletedMessageDAO.removeAll(mailboxId), - counterDAO.delete(mailboxId), - recentsDAO.delete(mailboxId)) - .then()); + return Flux.mergeDelayError(prefetch, + messageIdDAO.retrieveMessages(mailboxId, MessageRange.all(), Limit.unlimited()) + .concatMap(metadata -> handleMessageDeletionAsPartOfMailboxDeletion((CassandraMessageId) metadata.getComposedMessageId().getComposedMessageId().getMessageId(), + metadata.getComposedMessageId().getThreadId(), metadata.getComposedMessageId().getFlags(), mailboxId, path.getUser(), path, mailboxACL) + .then(imapUidDAO.delete((CassandraMessageId) metadata.getComposedMessageId().getComposedMessageId().getMessageId(), mailboxId)) + .then(messageIdDAO.delete(mailboxId, metadata.getComposedMessageId().getComposedMessageId().getUid()))), + deleteAcl(mailboxId), + applicableFlagDAO.delete(mailboxId), + firstUnseenDAO.removeAll(mailboxId), + deletedMessageDAO.removeAll(mailboxId), + counterDAO.delete(mailboxId), + recentsDAO.delete(mailboxId)) + .then(); } private Mono<Void> handleMessageDeletion(Expunged expunged) { @@ -202,13 +201,12 @@ public class DeleteMessageListener implements EventListener.ReactiveGroupEventLi } private Mono<Void> handleMessageDeletion(CassandraMessageId messageId, MailboxId mailboxId, ThreadId threadId, Flags flags, Username owner, MailboxPath mailboxPath) { - return Mono.just(messageId) - .filterWhen(this::isReferenced) - .flatMap(id -> Mono.zip(readMessage(id), getMailboxACL((CassandraId) mailboxId)) - .flatMap(tuple -> dispatchMessageContentDeletionEvent(mailboxId, owner, tuple.getT2(), flags, tuple.getT1(), mailboxPath) - .thenReturn(tuple.getT1())) - .flatMap(message -> deleteUnreferencedAttachments(message).thenReturn(message)) - .flatMap(this::deleteMessageBlobs) + return Mono.zip(readMessage(messageId), getMailboxACL((CassandraId) mailboxId)) + .flatMap(tuple -> dispatchMessageContentDeletionEvent(mailboxId, owner, tuple.getT2(), flags, tuple.getT1(), mailboxPath) + .thenReturn(tuple.getT1())) + .filterWhen(message -> isUnreferenced(messageId)) + .flatMap(message -> deleteUnreferencedAttachments(message) + .then(deleteMessageBlobs(message)) .then(messageDAOV3.delete(messageId)) .then(threadLookupDAO.selectOneRow(threadId, messageId) .flatMap(key -> threadDAO.deleteSome(key.getUsername(), key.getMimeMessageIds()) @@ -244,18 +242,17 @@ public class DeleteMessageListener implements EventListener.ReactiveGroupEventLi } private Mono<Void> handleMessageDeletionAsPartOfMailboxDeletion(CassandraMessageId messageId, ThreadId threadId, Flags flags, CassandraId excludedId, Username owner, MailboxPath mailboxPath, MailboxACL mailboxACL) { - return Mono.just(messageId) - .filterWhen(id -> isReferenced(id, excludedId)) - .flatMap(id -> readMessage(id) - .flatMap(message -> dispatchMessageContentDeletionEvent(excludedId, owner, mailboxACL, flags, message, mailboxPath) - .thenReturn(message))) - .flatMap(message -> deleteUnreferencedAttachments(message).thenReturn(message)) - .flatMap(this::deleteMessageBlobs) + return readMessage(messageId) + .flatMap(message -> dispatchMessageContentDeletionEvent(excludedId, owner, mailboxACL, flags, message, mailboxPath) + .thenReturn(message)) + .filterWhen(message -> isUnreferenced(messageId, excludedId)) + .flatMap(message -> deleteUnreferencedAttachments(message) + .then(deleteMessageBlobs(message)) .then(messageDAOV3.delete(messageId)) .then(threadLookupDAO.selectOneRow(threadId, messageId) .flatMap(key -> threadDAO.deleteSome(key.getUsername(), key.getMimeMessageIds()) .collectList())) - .then(threadLookupDAO.deleteOneRow(threadId, messageId)); + .then(threadLookupDAO.deleteOneRow(threadId, messageId))); } private Mono<MessageRepresentation> deleteMessageBlobs(MessageRepresentation message) { @@ -279,13 +276,13 @@ public class DeleteMessageListener implements EventListener.ReactiveGroupEventLi .then(); } - private Mono<Boolean> isReferenced(CassandraMessageId id) { + private Mono<Boolean> isUnreferenced(CassandraMessageId id) { return imapUidDAO.retrieve(id, ALL_MAILBOXES, chooseReadConsistencyUponWrites()) .hasElements() .map(negate()); } - private Mono<Boolean> isReferenced(CassandraMessageId id, CassandraId excludedId) { + private Mono<Boolean> isUnreferenced(CassandraMessageId id, CassandraId excludedId) { return imapUidDAO.retrieve(id, ALL_MAILBOXES, chooseReadConsistencyUponWrites()) .filter(metadata -> !metadata.getComposedMessageId().getComposedMessageId().getMailboxId().equals(excludedId)) .hasElements() diff --git a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/DeleteMessageListener.java b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/DeleteMessageListener.java index cdaffdd8e8..240eef1bd6 100644 --- a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/DeleteMessageListener.java +++ b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/DeleteMessageListener.java @@ -121,11 +121,10 @@ public class DeleteMessageListener implements EventListener.ReactiveGroupEventLi PostgresAttachmentDAO attachmentDAO = attachmentDAOFactory.create(event.getUsername().getDomainPart()); PostgresThreadDAO threadDAO = threadDAOFactory.create(event.getUsername().getDomainPart()); - return mailboxACL(event.getMailboxPath().getUser(), event.getMailboxId()) - .flatMapMany(mailboxACL -> postgresMailboxMessageDAO.deleteByMailboxId((PostgresMailboxId) event.getMailboxId()) - .flatMap(metaData -> handleMessageDeletion(postgresMessageDAO, postgresMailboxMessageDAO, attachmentDAO, threadDAO, - (PostgresMessageId) metaData.getMessageId(), event.getMailboxId(), event.getMailboxPath().getUser(), metaData.getFlags(), event.getMailboxPath(), mailboxACL), - LOW_CONCURRENCY)) + return postgresMailboxMessageDAO.deleteByMailboxId((PostgresMailboxId) event.getMailboxId()) + .flatMap(metaData -> handleMessageDeletion(postgresMessageDAO, postgresMailboxMessageDAO, attachmentDAO, threadDAO, + (PostgresMessageId) metaData.getMessageId(), event.getMailboxId(), event.getMailboxPath().getUser(), metaData.getFlags(), event.getMailboxPath(), event.getMailboxACL()), + LOW_CONCURRENCY) .then(); } @@ -151,11 +150,11 @@ public class DeleteMessageListener implements EventListener.ReactiveGroupEventLi Username owner, Flags flags, MailboxPath mailboxPath) { - return Mono.just(messageId) + return Mono.zip(postgresMessageDAO.retrieveMessage(messageId), mailboxACL(owner, mailboxId)) + .flatMap(tuple -> dispatchMessageContentDeletionEvent(mailboxId, owner, tuple.getT2(), flags, tuple.getT1(), mailboxPath) + .thenReturn(messageId)) .filterWhen(msgId -> isUnreferenced(msgId, postgresMailboxMessageDAO)) - .flatMap(msgId -> Mono.zip(postgresMessageDAO.retrieveMessage(messageId), mailboxACL(owner, mailboxId)) - .flatMap(tuple -> dispatchMessageContentDeletionEvent(mailboxId, owner, tuple.getT2(), flags, tuple.getT1(), mailboxPath))) - .then(deleteBodyBlob(msgId, postgresMessageDAO)) + .flatMap(msgId -> deleteBodyBlob(msgId, postgresMessageDAO) .then(deleteAttachmentIfEnabled(msgId, attachmentDAO)) .then(threadDAO.deleteSome(owner, msgId)) .then(postgresMessageDAO.deleteByMessageId(msgId))); @@ -171,14 +170,14 @@ public class DeleteMessageListener implements EventListener.ReactiveGroupEventLi Flags flags, MailboxPath mailboxPath, MailboxACL mailboxACL) { - return Mono.just(messageId) + return postgresMessageDAO.retrieveMessage(messageId) + .flatMap(messageRepresentation -> dispatchMessageContentDeletionEvent(mailboxId, owner, mailboxACL, flags, messageRepresentation, mailboxPath) + .thenReturn(messageId)) .filterWhen(msgId -> isUnreferenced(msgId, postgresMailboxMessageDAO)) - .flatMap(msgId -> postgresMessageDAO.retrieveMessage(messageId) - .flatMap(messageRepresentation -> dispatchMessageContentDeletionEvent(mailboxId, owner, mailboxACL, flags, messageRepresentation, mailboxPath)) - .then(deleteBodyBlob(msgId, postgresMessageDAO)) - .then(deleteAttachmentIfEnabled(msgId, attachmentDAO)) - .then(threadDAO.deleteSome(owner, msgId)) - .then(postgresMessageDAO.deleteByMessageId(msgId))); + .flatMap(msgId -> deleteBodyBlob(msgId, postgresMessageDAO) + .then(deleteAttachmentIfEnabled(messageId, attachmentDAO)) + .then(threadDAO.deleteSome(owner, messageId)) + .then(postgresMessageDAO.deleteByMessageId(messageId))); } private Mono<Void> dispatchMessageContentDeletionEvent(MailboxId mailboxId, Username owner, MailboxACL mailboxACL, Flags flags, MessageRepresentation message, MailboxPath mailboxPath) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
