This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 7f6003f5ddc5540c56622f99579fb59f6af5106b Author: duc91 <[email protected]> AuthorDate: Thu Oct 22 18:11:47 2020 +0700 JAMES-3432 Upload: Attachment --- .../james/jmap/rfc8621/RFC8621MethodsModule.java | 5 +- .../rfc8621/distributed/DistributedUploadTest.java | 53 ++++++++ .../james/jmap/rfc8621/contract/Fixture.scala | 2 + .../jmap/rfc8621/contract/UploadContract.scala | 131 ++++++++++++++++++++ .../apache/james/jmap/json/UploadSerializer.scala | 15 +++ .../apache/james/jmap/routes/DownloadRoutes.scala | 39 ++++-- .../apache/james/jmap/routes/UploadRoutes.scala | 133 +++++++++++++++++++++ 7 files changed, 368 insertions(+), 10 deletions(-) diff --git a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java index 535f1d5..13eeaed 100644 --- a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java +++ b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java @@ -47,6 +47,7 @@ import org.apache.james.jmap.method.VacationResponseSetMethod; import org.apache.james.jmap.method.ZoneIdProvider; import org.apache.james.jmap.model.JmapRfc8621Configuration; import org.apache.james.jmap.routes.DownloadRoutes; +import org.apache.james.jmap.routes.UploadRoutes; import org.apache.james.jmap.routes.JMAPApiRoutes; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.utils.PropertiesProvider; @@ -80,8 +81,8 @@ public class RFC8621MethodsModule extends AbstractModule { } @ProvidesIntoSet - JMAPRoutesHandler routesHandler(SessionRoutes sessionRoutes, JMAPApiRoutes jmapApiRoutes, DownloadRoutes downloadRoutes) { - return new JMAPRoutesHandler(Version.RFC8621, jmapApiRoutes, sessionRoutes, downloadRoutes); + JMAPRoutesHandler routesHandler(SessionRoutes sessionRoutes, JMAPApiRoutes jmapApiRoutes, DownloadRoutes downloadRoutes, UploadRoutes uploadRoutes) { + return new JMAPRoutesHandler(Version.RFC8621, jmapApiRoutes, sessionRoutes, downloadRoutes, uploadRoutes); } @Provides diff --git a/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedUploadTest.java b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedUploadTest.java new file mode 100644 index 0000000..9d432db --- /dev/null +++ b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedUploadTest.java @@ -0,0 +1,53 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.rfc8621.distributed; + +import org.apache.james.CassandraExtension; +import org.apache.james.CassandraRabbitMQJamesConfiguration; +import org.apache.james.CassandraRabbitMQJamesServerMain; +import org.apache.james.DockerElasticSearchExtension; +import org.apache.james.JamesServerBuilder; +import org.apache.james.JamesServerExtension; +import org.apache.james.jmap.rfc8621.contract.UploadContract; +import org.apache.james.modules.AwsS3BlobStoreExtension; +import org.apache.james.modules.RabbitMQExtension; +import org.apache.james.modules.TestJMAPServerModule; +import org.apache.james.modules.blobstore.BlobStoreConfiguration; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class DistributedUploadTest implements UploadContract { + @RegisterExtension + static JamesServerExtension testExtension = new JamesServerBuilder<CassandraRabbitMQJamesConfiguration>(tmpDir -> + CassandraRabbitMQJamesConfiguration.builder() + .workingDirectory(tmpDir) + .configurationFromClasspath() + .blobStore(BlobStoreConfiguration.builder() + .s3() + .disableCache() + .deduplication()) + .build()) + .extension(new DockerElasticSearchExtension()) + .extension(new CassandraExtension()) + .extension(new RabbitMQExtension()) + .extension(new AwsS3BlobStoreExtension()) + .server(configuration -> CassandraRabbitMQJamesServerMain.createServer(configuration) + .overrideWith(new TestJMAPServerModule())) + .build(); +} diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/Fixture.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/Fixture.scala index 4b70b04..6b2a9fb 100644 --- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/Fixture.scala +++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/Fixture.scala @@ -38,6 +38,7 @@ import org.apache.james.mime4j.dom.Message object Fixture { val ACCOUNT_ID: String = "29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6" + val ALICE_ACCOUNT_ID: String = "2bd806c97f0e00af1a1fc3328fa763a9269723c8db8fac4f93af71db186d6e90" def createTestMessage: Message = Message.Builder .of @@ -140,6 +141,7 @@ object Fixture { |}""".stripMargin val ACCEPT_RFC8621_VERSION_HEADER: String = "application/json; jmapVersion=rfc-8621" + val RFC8621_VERSION_HEADER: String = "jmapVersion=rfc-8621" val USER: Username = Username.fromLocalPartWithDomain("user", DOMAIN) val USER_PASSWORD: String = "user" diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/UploadContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/UploadContract.scala new file mode 100644 index 0000000..9605f44 --- /dev/null +++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/UploadContract.scala @@ -0,0 +1,131 @@ +package org.apache.james.jmap.rfc8621.contract + +import java.io.{ByteArrayInputStream, InputStream} +import java.nio.charset.StandardCharsets +import io.netty.handler.codec.http.HttpHeaderNames.ACCEPT +import io.restassured.RestAssured.{`given`, requestSpecification} +import io.restassured.http.ContentType +import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson +import org.apache.commons.io.IOUtils +import org.apache.http.HttpStatus.{SC_CREATED, SC_NOT_FOUND, SC_OK, SC_UNAUTHORIZED} +import org.apache.james.GuiceJamesServer +import org.apache.james.jmap.http.UserCredential +import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ACCOUNT_ID, BOB, BOB_PASSWORD, DOMAIN, RFC8621_VERSION_HEADER, authScheme, baseRequestSpecBuilder} +import org.apache.james.jmap.rfc8621.contract.UploadContract.{BIG_INPUT_STREAM, VALID_INPUT_STREAM} +import org.apache.james.utils.DataProbeImpl +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.{BeforeEach, Test} +import play.api.libs.json.{JsString, Json} + +object UploadContract { + private val BIG_INPUT_STREAM: InputStream = new ByteArrayInputStream("123456789\r\n".repeat(10025).getBytes) + private val VALID_INPUT_STREAM: InputStream = new ByteArrayInputStream("123456789\r\n".repeat(1).getBytes) +} + +trait UploadContract { + @BeforeEach + def setUp(server: GuiceJamesServer): Unit = { + server.getProbe(classOf[DataProbeImpl]) + .fluent + .addDomain(DOMAIN.asString) + .addUser(BOB.asString, BOB_PASSWORD) + + requestSpecification = baseRequestSpecBuilder(server) + .setAuth(authScheme(UserCredential(BOB, BOB_PASSWORD))) + .build + } + + @Test + def shouldUploadFileAndOnlyOwnerCanAccess(): Unit = { + val uploadResponse: String = `given` + .basePath("") + .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) + .body(VALID_INPUT_STREAM) + .when + .post(s"/upload/$ACCOUNT_ID/") + .`then` + .statusCode(SC_CREATED) + .extract + .body + .asString + + val blobId: String = Json.parse(uploadResponse).\("blobId").get.asInstanceOf[JsString].value + + val downloadResponse: String = `given` + .basePath("") + .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) + .when + .get(s"/download/$ACCOUNT_ID/$blobId") + .`then` + .statusCode(SC_OK) + .extract + .body + .asString + + val expectedResponse: String = IOUtils.toString(VALID_INPUT_STREAM, StandardCharsets.UTF_8) + + assertThat(new ByteArrayInputStream(downloadResponse.getBytes(StandardCharsets.UTF_8))) + .hasContent(expectedResponse) + } + + @Test + def shouldRejectWhenUploadFileTooBig(): Unit = { + val response: String = `given` + .basePath("") + .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) + .contentType(ContentType.BINARY) + .body(BIG_INPUT_STREAM) + .when + .post(s"/upload/$ACCOUNT_ID/") + .`then` + .statusCode(SC_OK) + .extract + .body + .asString + + // fixme: dont know we limit size or not? + assertThatJson(response) + .isEqualTo("Should be error") + } + + @Test + def uploadShouldRejectWhenUnauthenticated(): Unit = { + `given` + .auth() + .none() + .basePath("") + .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) + .contentType(ContentType.BINARY) + .body(VALID_INPUT_STREAM) + .when + .post(s"/upload/$ACCOUNT_ID/") + .`then` + .statusCode(SC_UNAUTHORIZED) + } + + @Test + def uploadShouldSucceedButExpiredWhenDownload(): Unit = { + val uploadResponse: String = `given` + .basePath("") + .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) + .body(VALID_INPUT_STREAM) + .when + .post(s"/upload/$ACCOUNT_ID/") + .`then` + .statusCode(SC_CREATED) + .extract + .body + .asString + + val blobId: String = Json.parse(uploadResponse).\("blobId").get.asInstanceOf[JsString].value + + // fixme: dont know how to delete file with existing attachment api + `given` + .basePath("") + .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER) + .when + .get(s"/download/$ACCOUNT_ID/$blobId") + .`then` + .statusCode(SC_NOT_FOUND) + } +} diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/UploadSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/UploadSerializer.scala new file mode 100644 index 0000000..a7f8b25 --- /dev/null +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/UploadSerializer.scala @@ -0,0 +1,15 @@ +package org.apache.james.jmap.json + +import org.apache.james.jmap.mail.BlobId +import org.apache.james.jmap.routes.UploadResponse +import org.apache.james.mailbox.model.ContentType +import play.api.libs.json.{JsString, JsValue, Json, Writes} + +class UploadSerializer { + + private implicit val blobIdWrites: Writes[BlobId] = Json.valueWrites[BlobId] + private implicit val contentTypeWrites: Writes[ContentType] = contentType => JsString(contentType.asString()) + private implicit val uploadResponseWrites: Writes[UploadResponse] = Json.writes[UploadResponse] + + def serialize(uploadResponse: UploadResponse): JsValue = Json.toJson(uploadResponse)(uploadResponseWrites) +} diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala index 03af576..f7c580e 100644 --- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/DownloadRoutes.scala @@ -1,4 +1,4 @@ -/** ************************************************************** +/**************************************************************** * Licensed to the Apache Software Foundation (ASF) under one * * or more contributor license agreements. See the NOTICE file * * distributed with this work for additional information * @@ -6,16 +6,16 @@ * to you under the Apache License, Version 2.0 (the * * "License"); you may not use this file except in compliance * * with the License. You may obtain a copy of the License at * - * * - * http://www.apache.org/licenses/LICENSE-2.0 * - * * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * * Unless required by applicable law or agreed to in writing, * * software distributed under the License is distributed on an * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * * KIND, either express or implied. See the License for the * * specific language governing permissions and limitations * * under the License. * - * ***************************************************************/ + ****************************************************************/ package org.apache.james.jmap.routes import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream} @@ -38,8 +38,8 @@ import org.apache.james.jmap.mail.Email.Size import org.apache.james.jmap.mail.{BlobId, EmailBodyPart, PartId} import org.apache.james.jmap.routes.DownloadRoutes.{BUFFER_SIZE, LOGGER} import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes} -import org.apache.james.mailbox.model.{ContentType, FetchGroup, MessageId, MessageResult} -import org.apache.james.mailbox.{MailboxSession, MessageIdManager} +import org.apache.james.mailbox.model.{AttachmentId, AttachmentMetadata, ContentType, FetchGroup, MessageId, MessageResult} +import org.apache.james.mailbox.{AttachmentManager, MailboxSession, MessageIdManager} import org.apache.james.mime4j.codec.EncoderUtil import org.apache.james.mime4j.codec.EncoderUtil.Usage import org.apache.james.mime4j.message.DefaultMessageWriter @@ -94,6 +94,16 @@ case class MessageBlob(blobId: BlobId, message: MessageResult) extends Blob { override def content: InputStream = message.getFullContent.getInputStream } +case class AttachmentBlob(attachmentMetadata: AttachmentMetadata, fileContent: InputStream) extends Blob { + override def size: Try[Size] = Success(UploadRoutes.sanitizeSize(attachmentMetadata.getSize)) + + override def contentType: ContentType = attachmentMetadata.getType + + override def content: InputStream = fileContent + + override def blobId: BlobId = BlobId.of(attachmentMetadata.getAttachmentId.getId).get +} + case class EmailBodyPartBlob(blobId: BlobId, part: EmailBodyPart) extends Blob { override def size: Try[Size] = Success(part.size) @@ -120,6 +130,17 @@ class MessageBlobResolver @Inject()(val messageIdFactory: MessageId.Factory, } } +class AttachmentBlobResolver @Inject()(val attachmentManager: AttachmentManager) extends BlobResolver { + override def resolve(blobId: BlobId, mailboxSession: MailboxSession): BlobResolutionResult = + AttachmentId.from(org.apache.james.mailbox.model.BlobId.fromString(blobId.value.value)) match { + case attachmentId: AttachmentId => Applicable( + SMono.fromCallable(() => attachmentManager.getAttachment(attachmentId, mailboxSession)) + .map((attachmentMetadata: AttachmentMetadata) => AttachmentBlob(attachmentMetadata, attachmentManager.load(attachmentMetadata, mailboxSession))) + ) + case _ => NonApplicable() + } +} + class MessagePartBlobResolver @Inject()(val messageIdFactory: MessageId.Factory, val messageIdManager: MessageIdManager) extends BlobResolver { private def asMessageAndPartId(blobId: BlobId): Try[(MessageId, PartId)] = { @@ -153,11 +174,13 @@ class MessagePartBlobResolver @Inject()(val messageIdFactory: MessageId.Factory, } class BlobResolvers @Inject()(val messageBlobResolver: MessageBlobResolver, - val messagePartBlobResolver: MessagePartBlobResolver) { + val messagePartBlobResolver: MessagePartBlobResolver, + val attachmentBlobResolver: AttachmentBlobResolver) { def resolve(blobId: BlobId, mailboxSession: MailboxSession): SMono[Blob] = messageBlobResolver .resolve(blobId, mailboxSession).asOption .orElse(messagePartBlobResolver.resolve(blobId, mailboxSession).asOption) + .orElse(attachmentBlobResolver.resolve(blobId, mailboxSession).asOption) .getOrElse(SMono.raiseError(BlobNotFoundException(blobId))) } diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala new file mode 100644 index 0000000..f06fa51 --- /dev/null +++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala @@ -0,0 +1,133 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.jmap.routes + +import java.io.InputStream +import java.time.ZonedDateTime +import java.util.stream +import java.util.stream.Stream + +import eu.timepit.refined.api.Refined +import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE +import io.netty.handler.codec.http.HttpResponseStatus.{BAD_REQUEST, CREATED} +import io.netty.handler.codec.http.HttpMethod +import javax.inject.{Inject, Named} +import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes} +import org.apache.james.jmap.http.Authenticator +import org.apache.james.jmap.http.rfc8621.InjectionKeys +import org.apache.james.jmap.mail.Email.Size +import org.apache.james.jmap.routes.UploadRoutes.{LOGGER, fromAttachment} +import org.apache.james.mailbox.{AttachmentManager, MailboxSession} +import org.apache.james.mailbox.model.{AttachmentMetadata, ContentType} +import org.apache.james.util.ReactorUtils +import org.slf4j.{Logger, LoggerFactory} +import reactor.core.publisher.Mono +import reactor.core.scala.publisher.SMono +import reactor.core.scheduler.Schedulers +import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse} +import eu.timepit.refined.auto._ +import eu.timepit.refined.numeric.NonNegative +import eu.timepit.refined.refineV +import org.apache.james.jmap.exceptions.UnauthorizedException +import org.apache.james.jmap.json.UploadSerializer +import org.apache.james.jmap.mail.BlobId + +object UploadRoutes { + val LOGGER: Logger = LoggerFactory.getLogger(classOf[DownloadRoutes]) + + type Size = Long Refined NonNegative + val Zero: Size = 0L + + def sanitizeSize(value: Long): Size = { + val size: Either[String, Size] = refineV[NonNegative](value) + size.fold(e => { + LOGGER.error(s"Encountered an invalid Email size: $e") + Zero + }, + refinedValue => refinedValue) + } + + def fromAttachment(attachmentMetadata: AttachmentMetadata): UploadResponse = + UploadResponse( + blobId = BlobId.of(attachmentMetadata.getAttachmentId.getId).get, + `type` = ContentType.of(attachmentMetadata.getType.asString), + size = sanitizeSize(attachmentMetadata.getSize), + expires = None) +} + +case class UploadResponse(blobId: BlobId, + `type`: ContentType, + size: Size, + expires: Option[ZonedDateTime]) + +class UploadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) val authenticator: Authenticator, + val attachmentManager: AttachmentManager, + val serializer: UploadSerializer) extends JMAPRoutes { + + class CancelledUploadException extends RuntimeException { + + } + + private val accountIdParam: String = "accountId" + private val uploadURI = s"/upload/{$accountIdParam}/" + + override def routes(): stream.Stream[JMAPRoute] = Stream.of( + JMAPRoute.builder + .endpoint(new Endpoint(HttpMethod.POST, uploadURI)) + .action(this.post) + .corsHeaders, + JMAPRoute.builder + .endpoint(new Endpoint(HttpMethod.OPTIONS, uploadURI)) + .action(JMAPRoutes.CORS_CONTROL) + .noCorsHeaders) + + def post(request: HttpServerRequest, response: HttpServerResponse): Mono[Void] = { + request.requestHeaders.get(CONTENT_TYPE) match { + case contentType => SMono.fromPublisher( + authenticator.authenticate(request)) + .flatMap(session => post(request, response, ContentType.of(contentType), session)) + .onErrorResume { + case e: UnauthorizedException => SMono.fromPublisher(handleAuthenticationFailure(response, LOGGER, e)) + case e: Throwable => SMono.fromPublisher(handleInternalError(response, LOGGER, e)) + } + .asJava().`then`() + case _ => response.status(BAD_REQUEST).send + } + } + + def post(request: HttpServerRequest, response: HttpServerResponse, contentType: ContentType, session: MailboxSession): SMono[Void] = { + SMono.fromCallable(() => ReactorUtils.toInputStream(request.receive.asByteBuffer)) + .flatMap(content => handle(contentType, content, session, response)) + .subscribeOn(Schedulers.elastic()) + } + + def handle(contentType: ContentType, content: InputStream, mailboxSession: MailboxSession, response: HttpServerResponse): SMono[Void] = + uploadContent(contentType, content, mailboxSession) + .flatMap(uploadResponse => SMono.fromPublisher(response + .header(CONTENT_TYPE, uploadResponse.`type`.asString()) + .status(CREATED) + .sendString(SMono.just(serializer.serialize(uploadResponse).toString())))) + + def uploadContent(contentType: ContentType, inputStream: InputStream, session: MailboxSession): SMono[UploadResponse] = + SMono + .fromPublisher(attachmentManager.storeAttachment(contentType, inputStream, session)) + .map(fromAttachment) + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
