This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 0bc550879e JAMES-2586 Reactify AttachmentBlobResolver
0bc550879e is described below
commit 0bc550879e5059ab65d8e62340ccd5d8dd431366
Author: Benoit TELLIER <[email protected]>
AuthorDate: Fri Jan 17 17:40:40 2025 +0100
JAMES-2586 Reactify AttachmentBlobResolver
---
.../apache/james/mailbox/AttachmentManager.java | 2 +
.../mailbox/store/StoreAttachmentManager.java | 8 +++
.../org/apache/james/modules/MailboxProbeImpl.java | 7 +++
.../rfc8621/contract/CustomMethodContract.scala | 6 +--
.../jmap/rfc8621/contract/DownloadContract.scala | 28 ++++++++++-
.../apache/james/jmap/routes/DownloadRoutes.scala | 58 ++++++++++++----------
6 files changed, 80 insertions(+), 29 deletions(-)
diff --git
a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
index 2387ce722c..c1686e4088 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/AttachmentManager.java
@@ -36,6 +36,8 @@ public interface AttachmentManager extends
AttachmentContentLoader {
AttachmentMetadata getAttachment(AttachmentId attachmentId, MailboxSession
mailboxSession) throws MailboxException, AttachmentNotFoundException;
+ Mono<AttachmentMetadata> getAttachmentReactive(AttachmentId attachmentId,
MailboxSession mailboxSession);
+
List<AttachmentMetadata> getAttachments(List<AttachmentId> attachmentIds,
MailboxSession mailboxSession) throws MailboxException;
InputStream loadAttachmentContent(AttachmentId attachmentId,
MailboxSession mailboxSession) throws AttachmentNotFoundException, IOException;
diff --git
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
index 13e471afe9..570543fc5a 100644
---
a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
+++
b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreAttachmentManager.java
@@ -81,6 +81,14 @@ public class StoreAttachmentManager implements
AttachmentManager {
return attachment;
}
+ @Override
+ public Mono<AttachmentMetadata> getAttachmentReactive(AttachmentId
attachmentId, MailboxSession mailboxSession) {
+ return attachmentMapperFactory.getAttachmentMapper(mailboxSession)
+ .getAttachmentReactive(attachmentId)
+ .filterWhen(attachment -> existsReactive(attachmentId,
mailboxSession))
+ .switchIfEmpty(Mono.error(() -> new
AttachmentNotFoundException(attachmentId.getId())));
+ }
+
@Override
public List<AttachmentMetadata> getAttachments(List<AttachmentId>
attachmentIds, MailboxSession mailboxSession) throws MailboxException {
List<AttachmentMetadata> attachments =
attachmentMapperFactory.getAttachmentMapper(mailboxSession)
diff --git
a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/MailboxProbeImpl.java
b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/MailboxProbeImpl.java
index b01c6ea653..7829a4c87e 100644
---
a/server/container/guice/mailbox/src/main/java/org/apache/james/modules/MailboxProbeImpl.java
+++
b/server/container/guice/mailbox/src/main/java/org/apache/james/modules/MailboxProbeImpl.java
@@ -187,6 +187,13 @@ public class MailboxProbeImpl implements GuiceProbe,
MailboxProbe {
return messageManager.appendMessage(appendCommand,
mailboxSession).getId();
}
+ public MessageManager.AppendResult
appendMessageRetrieveAppendResult(String username, MailboxPath mailboxPath,
MessageManager.AppendCommand appendCommand)
+ throws MailboxException {
+ MailboxSession mailboxSession =
mailboxManager.createSystemSession(Username.of(username));
+ MessageManager messageManager = mailboxManager.getMailbox(mailboxPath,
mailboxSession);
+ return messageManager.appendMessage(appendCommand, mailboxSession);
+ }
+
public MessageManager.AppendResult appendMessageAndGetAppendResult(String
username, MailboxPath mailboxPath, MessageManager.AppendCommand appendCommand)
throws MailboxException {
MailboxSession mailboxSession =
mailboxManager.createSystemSession(Username.of(username));
diff --git
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/CustomMethodContract.scala
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/CustomMethodContract.scala
index 9b71b545fa..71ab030455 100644
---
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/CustomMethodContract.scala
+++
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/CustomMethodContract.scala
@@ -221,11 +221,11 @@ case object CustomBlob extends Blob {
}
class CustomBlobResolver extends BlobResolver {
- override def resolve(blobId: org.apache.james.jmap.mail.BlobId,
mailboxSession: MailboxSession): BlobResolutionResult =
+ override def resolve(blobId: org.apache.james.jmap.mail.BlobId,
mailboxSession: MailboxSession): SMono[BlobResolutionResult] =
if (blobId.equals(CustomBlob.blobId)) {
- Applicable(SMono.just(CustomBlob))
+ SMono.just(Applicable(SMono.just(CustomBlob)))
} else {
- NonApplicable
+ SMono.just(NonApplicable)
}
}
diff --git
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/DownloadContract.scala
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/DownloadContract.scala
index 68f9fec1fb..5b2ea8b20d 100644
---
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/DownloadContract.scala
+++
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/DownloadContract.scala
@@ -32,7 +32,7 @@ import
org.apache.james.jmap.rfc8621.contract.DownloadContract.accountId
import
org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER,
ALICE_ACCOUNT_ID, ANDRE, BOB, BOB_PASSWORD, CEDRIC, DOMAIN, authScheme,
baseRequestSpecBuilder}
import org.apache.james.mailbox.MessageManager.AppendCommand
import org.apache.james.mailbox.model.MailboxACL.Right
-import org.apache.james.mailbox.model.{MailboxACL, MailboxPath, MessageId}
+import org.apache.james.mailbox.model.{AttachmentId, MailboxACL, MailboxPath,
MessageId}
import org.apache.james.mime4j.dom.Message
import org.apache.james.modules.{ACLProbeImpl, MailboxProbeImpl}
import org.apache.james.util.ClassLoaderUtils
@@ -92,6 +92,32 @@ trait DownloadContract {
.hasContent(expectedResponse)
}
+ @Test
+ def downloadMailboxAttachment(server: GuiceJamesServer): Unit = {
+ val path = MailboxPath.inbox(BOB)
+ server.getProbe(classOf[MailboxProbeImpl]).createMailbox(path)
+ val attachmentId: AttachmentId = server.getProbe(classOf[MailboxProbeImpl])
+ .appendMessageRetrieveAppendResult(BOB.asString, path,
AppendCommand.from(
+
ClassLoaderUtils.getSystemResourceAsSharedStream("eml/multipart_simple.eml")))
+ .getMessageAttachments
+ .get(1).getAttachmentId
+
+ val response = `given`
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER).log().all()
+ .when
+ .get(s"/download/$accountId/${attachmentId.getId()}")
+ .`then`
+ .statusCode(SC_OK)
+ .contentType("application/vnd.ms-publisher; name=\"text2\"")
+ .extract
+ .body
+ .asString
+
+ assertThat(response)
+ .isEqualTo("ssh-rsa
AAAAB3NzaC1yc2EAAAADAQABAAABAQDHs8bT4T/8QymbsiAjlD1MwNIXJr/WET6+9MmuTSIYWWU94csDn9WVMzRhaAbpfnSqIx8TdUtrN/ZzX2JetPSar/bU9nXAWeiC/jPFQ1qKH4GeDrYXRLKu4T8782OrGH8Jyror97TlNXhPrjdRLEB4bQqmmZhb3HwcD8a9XzfZqlm7GRWLo1WQMGt/NpQLC7jMf4fA6/+kjzsTspxwdgL74GJqPfOXOiwgLHX8CZ6/5RyTqhT6pD3MktSNWaz/zIHPNEqf5BY9CBM1TFR5w+6MDHo0gmiIsXFEJTPnfhBvHDhSjB1RI0KxUClyYrJ4fBlUVeKfnawoVcu7YvCqF4F5
quynhnn@linagora\n")
+ }
+
@Test
def downloadMessageShouldFailWhenUnauthentified(server: GuiceJamesServer):
Unit = {
val path = MailboxPath.inbox(BOB)
diff --git
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
index 997957f3ba..99bc6f4951 100644
---
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
+++
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala
@@ -48,6 +48,7 @@ import org.apache.james.jmap.mail.{BlobId,
MinimalEmailBodyPart}
import org.apache.james.jmap.method.{AccountNotFoundException, ZoneIdProvider}
import org.apache.james.jmap.routes.DownloadRoutes.{BUFFER_SIZE, LOGGER}
import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes}
+import org.apache.james.mailbox.exception.AttachmentNotFoundException
import org.apache.james.mailbox.model.ContentType.{MediaType, MimeType,
SubType}
import org.apache.james.mailbox.model._
import org.apache.james.mailbox.{AttachmentIdFactory, AttachmentManager,
MailboxSession, MessageIdManager}
@@ -57,10 +58,11 @@ import org.apache.james.mime4j.codec.EncoderUtil.Usage
import org.apache.james.mime4j.dom.SingleBody
import org.apache.james.mime4j.message.DefaultMessageWriter
import org.apache.james.util.ReactorUtils
+import org.reactivestreams.Publisher
import org.slf4j.{Logger, LoggerFactory}
import play.api.libs.json.Json
import reactor.core.publisher.Mono
-import reactor.core.scala.publisher.SMono
+import reactor.core.scala.publisher.{SFlux, SMono}
import reactor.core.scheduler.Schedulers
import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse}
@@ -84,7 +86,7 @@ case class Applicable(blob: SMono[Blob]) extends
BlobResolutionResult {
}
trait BlobResolver {
- def resolve(blobId: BlobId, mailboxSession: MailboxSession):
BlobResolutionResult
+ def resolve(blobId: BlobId, mailboxSession: MailboxSession):
Publisher[BlobResolutionResult]
}
trait Blob {
@@ -143,13 +145,13 @@ case class EmailBodyPartBlob(blobId: BlobId, part:
MinimalEmailBodyPart) extends
class MessageBlobResolver @Inject()(val messageIdFactory: MessageId.Factory,
val messageIdManager: MessageIdManager)
extends BlobResolver {
- override def resolve(blobId: BlobId, mailboxSession: MailboxSession):
BlobResolutionResult = {
+ override def resolve(blobId: BlobId, mailboxSession: MailboxSession):
SMono[BlobResolutionResult] = {
Try(messageIdFactory.fromString(blobId.value.value)) match {
- case Failure(_) => NonApplicable
- case Success(messageId) => Applicable(SMono.fromPublisher(
+ case Failure(_) => SMono.just(NonApplicable)
+ case Success(messageId) => SMono.just(Applicable(SMono.fromPublisher(
messageIdManager.getMessagesReactive(List(messageId).asJava,
FetchGroup.FULL_CONTENT, mailboxSession))
.map[Blob](MessageBlob(blobId, _))
- .switchIfEmpty(SMono.error(BlobNotFoundException(blobId))))
+ .switchIfEmpty(SMono.error(BlobNotFoundException(blobId)))))
}
}
}
@@ -157,12 +159,12 @@ class MessageBlobResolver @Inject()(val messageIdFactory:
MessageId.Factory,
class UploadResolver @Inject()(val uploadService: UploadService) extends
BlobResolver {
private val prefix = "uploads-"
- override def resolve(blobId: BlobId, mailboxSession: MailboxSession):
BlobResolutionResult = {
+ override def resolve(blobId: BlobId, mailboxSession: MailboxSession):
SMono[BlobResolutionResult] = {
if (!blobId.value.value.startsWith(prefix)) {
- NonApplicable
+ SMono.just(NonApplicable)
} else {
val uploadIdAsString = blobId.value.value.substring(prefix.length)
- Try(UploadId.from(uploadIdAsString)) match {
+ SMono.just(Try(UploadId.from(uploadIdAsString)) match {
case Failure(_) => NonApplicable
case Success(uploadId) => Applicable(
SMono(uploadService.retrieve(uploadId, mailboxSession.getUser))
@@ -170,22 +172,23 @@ class UploadResolver @Inject()(val uploadService:
UploadService) extends BlobRes
.onErrorResume {
case _: UploadNotFoundException =>
SMono.error(BlobNotFoundException(blobId))
})
- }
+ })
}
}
}
class AttachmentBlobResolver @Inject()(val attachmentManager:
AttachmentManager, val attachmentIdFactory: AttachmentIdFactory) extends
BlobResolver {
- override def resolve(blobId: BlobId, mailboxSession: MailboxSession):
BlobResolutionResult =
+ override def resolve(blobId: BlobId, mailboxSession: MailboxSession):
SMono[BlobResolutionResult] =
attachmentIdFactory.from(blobId.value.value) match {
case attachmentId: StringBackedAttachmentId =>
- Try(attachmentManager.getAttachment(attachmentId, mailboxSession))
match {
- case Success(attachmentMetadata) =>
-
Applicable(SMono(attachmentManager.loadReactive(attachmentMetadata,
mailboxSession))
- .map(content => AttachmentBlob(attachmentMetadata, content)))
- case Failure(_) => NonApplicable
- }
- case _ => NonApplicable
+ SMono(attachmentManager.getAttachmentReactive(attachmentId,
mailboxSession))
+ .map(attachmentMetadata =>
Applicable(SMono(attachmentManager.loadReactive(attachmentMetadata,
mailboxSession))
+ .map(content => AttachmentBlob(attachmentMetadata, content))))
+ .onErrorResume {
+ case e: AttachmentNotFoundException =>
SMono.just(NonApplicable.asInstanceOf[BlobResolutionResult])
+ case e => SMono.error[BlobResolutionResult](e)
+ }
+ case _ => SMono.just(NonApplicable)
}
}
@@ -207,11 +210,11 @@ class MessagePartBlobResolver @Inject()(val
messageIdFactory: MessageId.Factory,
case (acc, idPart) => acc.headOption.map(prefix => prefix + "_" +
idPart).getOrElse(idPart) :: acc
}.flatMap(s => BlobId.of(s).toOption).take(parts.size).reverse
- override def resolve(blobId: BlobId, mailboxSession: MailboxSession):
BlobResolutionResult = {
+ override def resolve(blobId: BlobId, mailboxSession: MailboxSession):
SMono[BlobResolutionResult] = {
asMessageAndPartIds(blobId) match {
- case Failure(_) => NonApplicable
+ case Failure(_) => SMono.just(NonApplicable)
case Success((messageId, blobIds)) =>
- Applicable(SMono.fromPublisher(
+ SMono.just(Applicable(SMono.fromPublisher(
messageIdManager.getMessagesReactive(List(messageId).asJava,
FetchGroup.FULL_CONTENT, mailboxSession))
.handle[MinimalEmailBodyPart] {
case (message, sink) => MinimalEmailBodyPart.ofMessage(None,
zoneIdSupplier.get(), BlobId.of(messageId).get, message)
@@ -227,7 +230,7 @@ class MessagePartBlobResolver @Inject()(val
messageIdFactory: MessageId.Factory,
.fold(sink.error(BlobNotFoundException(blobId)))(part =>
sink.next(part))
}
.map[Blob](EmailBodyPartBlob(blobId, _))
- .switchIfEmpty(SMono.error(BlobNotFoundException(blobId))))
+ .switchIfEmpty(SMono.error(BlobNotFoundException(blobId)))))
}
}
}
@@ -240,9 +243,14 @@ class BlobResolvers(blobResolvers: Set[BlobResolver]) {
}
def resolve(blobId: BlobId, mailboxSession: MailboxSession): SMono[Blob] =
- blobResolvers.flatMap(resolver => resolver.resolve(blobId,
mailboxSession).asOption)
- .headOption
- .getOrElse(SMono.error(BlobNotFoundException(blobId)))
+ SFlux.fromIterable(blobResolvers)
+ .concatMap(resolver => resolver.resolve(blobId, mailboxSession))
+ .filter {
+ case NonApplicable => false
+ case _: Applicable => true
+ }
+ .concatMap(result =>
result.asOption.getOrElse(SMono.error(BlobNotFoundException(blobId))))
+ .next().switchIfEmpty(SMono.error(BlobNotFoundException(blobId)))
}
class DownloadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val
authenticator: Authenticator,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]