This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 91595e7ecc19d9fe5a121c5c8c3963bd4414ecd9 Author: Benoit Tellier <[email protected]> AuthorDate: Thu May 13 17:37:05 2021 +0700 [REFACTORING] MailboxChangeListener was performing some blocking calls... --- .../james/mailbox/store/StoreRightManager.java | 1 + .../james/jmap/api/change/MailboxChange.java | 6 +- .../james/jmap/change/MailboxChangeListener.scala | 85 ++++++++++------------ 3 files changed, 44 insertions(+), 48 deletions(-) diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java index 43578ea..7274f0c 100644 --- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java +++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreRightManager.java @@ -151,6 +151,7 @@ public class StoreRightManager implements RightManager { return mailbox.getACL(); } + @Override public MailboxACL listRights(MailboxId mailboxId, MailboxSession session) throws MailboxException { MailboxMapper mapper = mailboxSessionMapperFactory.getMailboxMapper(session); Mailbox mailbox = blockOptional(mapper.findMailboxById(mailboxId)) diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxChange.java index c1deef5..3da4de7 100644 --- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxChange.java +++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxChange.java @@ -130,14 +130,14 @@ public class MailboxChange implements JmapChange { this.stateFactory = stateFactory; } - public List<JmapChange> fromMailboxAdded(MailboxAdded mailboxAdded, ZonedDateTime now) { - return ImmutableList.of(MailboxChange.builder() + public JmapChange fromMailboxAdded(MailboxAdded mailboxAdded, ZonedDateTime now) { + return MailboxChange.builder() .accountId(AccountId.fromUsername(mailboxAdded.getUsername())) .state(stateFactory.generate()) .date(now) .isCountChange(false) .created(ImmutableList.of(mailboxAdded.getMailboxId())) - .build()); + .build(); } public List<JmapChange> fromMailboxRenamed(MailboxRenamed mailboxRenamed, ZonedDateTime now, List<AccountId> sharees) { diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala index 84337db..b4c6aa9 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala @@ -31,10 +31,9 @@ import org.apache.james.jmap.api.change.{EmailChange, EmailChangeRepository, Jma import org.apache.james.jmap.api.model.AccountId import org.apache.james.jmap.change.MailboxChangeListener.LOGGER import org.apache.james.jmap.core.UuidState +import org.apache.james.mailbox.MailboxManager import org.apache.james.mailbox.events.MailboxEvents.{Added, Expunged, FlagsUpdated, MailboxACLUpdated, MailboxAdded, MailboxDeletion, MailboxEvent, MailboxRenamed} -import org.apache.james.mailbox.exception.MailboxException import org.apache.james.mailbox.model.{MailboxACL, MailboxId} -import org.apache.james.mailbox.{MailboxManager, MailboxSession} import org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY import org.reactivestreams.Publisher import org.slf4j.{Logger, LoggerFactory} @@ -57,44 +56,43 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus: clock: Clock) extends ReactiveGroupEventListener { override def reactiveEvent(event: Event): Publisher[Void] = - handleEvent(event.asInstanceOf[MailboxEvent]) - .`then`(SMono.empty[Void]) - .asJava + jmapChanges(event.asInstanceOf[MailboxEvent]) + .flatMap(saveChangeEvent, DEFAULT_CONCURRENCY) + .`then`() override def getDefaultGroup: Group = MailboxChangeListenerGroup() override def isHandling(event: Event): Boolean = event.isInstanceOf[MailboxEvent] - private def handleEvent(mailboxEvent: MailboxEvent): SMono[Unit] = { + private def jmapChanges(mailboxEvent: MailboxEvent): SFlux[JmapChange] = { val now: ZonedDateTime = ZonedDateTime.now(clock) val mailboxId: MailboxId = mailboxEvent.getMailboxId val username: Username = mailboxEvent.getUsername - SFlux.fromIterable( - mailboxEvent match { - case mailboxAdded: MailboxAdded => - mailboxChangeFactory.fromMailboxAdded(mailboxAdded, now).asScala - case mailboxRenamed: MailboxRenamed => - mailboxChangeFactory.fromMailboxRenamed(mailboxRenamed, now, getSharees(mailboxId, username).asJava).asScala - case mailboxACLUpdated: MailboxACLUpdated => - mailboxChangeFactory.fromMailboxACLUpdated(mailboxACLUpdated, now, getSharees(mailboxId, username).asJava).asScala - case mailboxDeletion: MailboxDeletion => - mailboxChangeFactory.fromMailboxDeletion(mailboxDeletion, now).asScala - case added: Added => - val sharees = getSharees(mailboxId, username).asJava - mailboxChangeFactory.fromAdded(added, now, sharees).asScala - .concat(emailChangeFactory.fromAdded(added, now, sharees).asScala) - case flagsUpdated: FlagsUpdated => - val sharees = getSharees(mailboxId, username).asJava - mailboxChangeFactory.fromFlagsUpdated(flagsUpdated, now, sharees).asScala - .concat(emailChangeFactory.fromFlagsUpdated(flagsUpdated, now, sharees).asScala) - case expunged: Expunged => - val sharees = getSharees(mailboxId, username) - mailboxChangeFactory.fromExpunged(expunged, now, sharees.asJava).asScala - .concat(emailChangeFactory.fromExpunged(expunged, now, sharees.map(_.getIdentifier).map(Username.of).asJava).asScala) - }) - .flatMap(saveChangeEvent, DEFAULT_CONCURRENCY) - .`then`() + mailboxEvent match { + case mailboxAdded: MailboxAdded => + SFlux.just(mailboxChangeFactory.fromMailboxAdded(mailboxAdded, now)) + case mailboxRenamed: MailboxRenamed => + getSharees(mailboxId, username) + .flatMapIterable(sharees => mailboxChangeFactory.fromMailboxRenamed(mailboxRenamed, now, sharees.asJava).asScala) + case mailboxACLUpdated: MailboxACLUpdated => + getSharees(mailboxId, username) + .flatMapIterable(sharees => mailboxChangeFactory.fromMailboxACLUpdated(mailboxACLUpdated, now, sharees.asJava).asScala) + case mailboxDeletion: MailboxDeletion => + SFlux.fromIterable(mailboxChangeFactory.fromMailboxDeletion(mailboxDeletion, now).asScala) + case added: Added => + getSharees(mailboxId, username) + .flatMapIterable(sharees => mailboxChangeFactory.fromAdded(added, now, sharees.asJava).asScala + .concat(emailChangeFactory.fromAdded(added, now, sharees.asJava).asScala)) + case flagsUpdated: FlagsUpdated => + getSharees(mailboxId, username) + .flatMapIterable(sharees => mailboxChangeFactory.fromFlagsUpdated(flagsUpdated, now, sharees.asJava).asScala + .concat(emailChangeFactory.fromFlagsUpdated(flagsUpdated, now, sharees.asJava).asScala)) + case expunged: Expunged => + getSharees(mailboxId, username) + .flatMapIterable(sharees => mailboxChangeFactory.fromExpunged(expunged, now, sharees.asJava).asScala + .concat(emailChangeFactory.fromExpunged(expunged, now, sharees.map(_.getIdentifier).map(Username.of).asJava).asScala)) + } } private def saveChangeEvent(jmapChange: JmapChange): Publisher[Void] = @@ -103,23 +101,21 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus: case emailChange: EmailChange => emailChangeRepository.save(emailChange) }).`then`(SMono(eventBus.dispatch(toStateChangeEvent(jmapChange), AccountIdRegistrationKey(jmapChange.getAccountId)))) - - private def getSharees(mailboxId: MailboxId, username: Username): List[AccountId] = { - val mailboxSession: MailboxSession = mailboxManager.createSystemSession(username) - try { - val mailboxACL = mailboxManager.listRights(mailboxId, mailboxSession) - mailboxACL.getEntries.keySet + private def getSharees(mailboxId: MailboxId, username: Username): SMono[List[AccountId]] = { + val session = mailboxManager.createSystemSession(username) + SMono(mailboxManager.getMailboxReactive(mailboxId, session)) + .map(mailbox => mailbox.getResolvedAcl(session)) + .map(mailboxACL => mailboxACL.getEntries.keySet .asScala .filter(!_.isNegative) .filter(_.getNameType == MailboxACL.NameType.user) .map(_.getName) .map(AccountId.fromString) - .toList - } catch { - case e: MailboxException => - LOGGER.warn("Could not get sharees for mailbox [%s] when listening to change events", mailboxId) - List.empty - } + .toList) + .onErrorResume(e => { + LOGGER.warn("Could not get sharees for mailbox [%s] when listening to change events", mailboxId, e) + SMono.just(List.empty) + }) } private def toStateChangeEvent(jmapChange: JmapChange): StateChangeEvent = jmapChange match { @@ -130,8 +126,7 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus: Some(UuidState.fromJava(emailChange.getState)) .filter(_ => !emailChange.getCreated.isEmpty) .map(emailDeliveryState => Map(EmailDeliveryTypeName -> emailDeliveryState)) - .getOrElse(Map())).toMap - ) + .getOrElse(Map())).toMap) case mailboxChange: MailboxChange => StateChangeEvent( eventId = EventId.random(), username = Username.of(mailboxChange.getAccountId.getIdentifier), --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
