This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d46f2f4f6 JAMES-4157 Implement Blob/copy (#2896)
2d46f2f4f6 is described below

commit 2d46f2f4f6c2911a479decc5c064add0c75e8795
Author: Trần Hồng Quân <[email protected]>
AuthorDate: Wed Jan 7 09:11:45 2026 +0700

    JAMES-4157 Implement Blob/copy (#2896)
---
 .../james/jmap/rfc8621/RFC8621MethodsModule.java   |   2 +
 .../distributed/DistributedBlobCopyTest.java       |  62 +++
 .../jmap/rfc8621/contract/BlobCopyContract.scala   | 617 +++++++++++++++++++++
 .../jmap/rfc8621/memory/MemoryBlobCopyTest.java    |  49 ++
 .../rfc8621/postgres/PostgresBlobCopyTest.java     |  61 ++
 .../jmap-rfc-8621/doc/specs/spec/jmap/binary.mdown |   4 +-
 .../org/apache/james/jmap/core/BlobCopy.scala      |  40 ++
 .../org/apache/james/jmap/core/Invocation.scala    |   4 +
 .../james/jmap/json/BlobCopySerializer.scala       |  41 ++
 .../apache/james/jmap/method/BlobCopyMethod.scala  | 148 +++++
 .../apache/james/jmap/routes/UploadRoutes.scala    |   9 +-
 11 files changed, 1031 insertions(+), 6 deletions(-)

diff --git 
a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
 
b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
index 8141b9093b..66437c8c70 100644
--- 
a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
+++ 
b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
@@ -52,6 +52,7 @@ import org.apache.james.jmap.http.rfc8621.InjectionKeys;
 import org.apache.james.jmap.mail.DefaultNamespaceFactory;
 import org.apache.james.jmap.mail.NamespaceFactory;
 import org.apache.james.jmap.mail.SortOrderProvider;
+import org.apache.james.jmap.method.BlobCopyMethod;
 import org.apache.james.jmap.method.CoreEchoMethod;
 import org.apache.james.jmap.method.DelegateGetMethod;
 import org.apache.james.jmap.method.DelegateSetMethod;
@@ -141,6 +142,7 @@ public class RFC8621MethodsModule extends AbstractModule {
 
         Multibinder<Method> methods = Multibinder.newSetBinder(binder(), 
Method.class);
         methods.addBinding().to(CoreEchoMethod.class);
+        methods.addBinding().to(BlobCopyMethod.class);
         methods.addBinding().to(EmailChangesMethod.class);
         methods.addBinding().to(EmailImportMethod.class);
         methods.addBinding().to(EmailGetMethod.class);
diff --git 
a/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedBlobCopyTest.java
 
b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedBlobCopyTest.java
new file mode 100644
index 0000000000..1f5468a9d3
--- /dev/null
+++ 
b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedBlobCopyTest.java
@@ -0,0 +1,62 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *  http://www.apache.org/licenses/LICENSE-2.0                  *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.rfc8621.distributed;
+
+import org.apache.james.CassandraExtension;
+import org.apache.james.CassandraRabbitMQJamesConfiguration;
+import org.apache.james.CassandraRabbitMQJamesServerMain;
+import org.apache.james.DockerOpenSearchExtension;
+import org.apache.james.JamesServerBuilder;
+import org.apache.james.JamesServerExtension;
+import org.apache.james.SearchConfiguration;
+import org.apache.james.jmap.rfc8621.contract.BlobCopyContract;
+import org.apache.james.jmap.rfc8621.contract.BlobCopyContract$;
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbeModule;
+import org.apache.james.modules.AwsS3BlobStoreExtension;
+import org.apache.james.modules.RabbitMQExtension;
+import org.apache.james.modules.TestJMAPServerModule;
+import org.apache.james.modules.blobstore.BlobStoreConfiguration;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableMap;
+
+public class DistributedBlobCopyTest implements BlobCopyContract {
+    @RegisterExtension
+    static JamesServerExtension testExtension = new 
JamesServerBuilder<CassandraRabbitMQJamesConfiguration>(tmpDir ->
+        CassandraRabbitMQJamesConfiguration.builder()
+            .workingDirectory(tmpDir)
+            .configurationFromClasspath()
+            .enableJMAP()
+            .blobStore(BlobStoreConfiguration.builder()
+                .s3()
+                .disableCache()
+                .deduplication()
+                .noCryptoConfig())
+            .searchConfiguration(SearchConfiguration.openSearch())
+            .build())
+        .extension(new DockerOpenSearchExtension())
+        .extension(new CassandraExtension())
+        .extension(new RabbitMQExtension())
+        .extension(new AwsS3BlobStoreExtension())
+        .server(configuration -> 
CassandraRabbitMQJamesServerMain.createServer(configuration)
+            .overrideWith(new 
TestJMAPServerModule(ImmutableMap.of("upload.quota.limit", 
BlobCopyContract$.MODULE$.TWENTY_KILO_BYTES_UPLOAD_QUOTA_LIMIT())),
+                new DelegationProbeModule()))
+        .build();
+}
diff --git 
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/BlobCopyContract.scala
 
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/BlobCopyContract.scala
new file mode 100644
index 0000000000..03abf2c1d7
--- /dev/null
+++ 
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/BlobCopyContract.scala
@@ -0,0 +1,617 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *  http://www.apache.org/licenses/LICENSE-2.0                  *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.rfc8621.contract
+
+import java.io.ByteArrayInputStream
+import java.nio.charset.StandardCharsets
+
+import com.google.common.base.Strings
+import io.netty.handler.codec.http.HttpHeaderNames.ACCEPT
+import io.restassured.RestAssured.{`given`, requestSpecification}
+import io.restassured.http.ContentType
+import io.restassured.path.json.JsonPath
+import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson
+import org.apache.http.HttpStatus.{SC_CREATED, SC_OK}
+import org.apache.james.GuiceJamesServer
+import org.apache.james.jmap.core.AccountId
+import org.apache.james.jmap.http.UserCredential
+import 
org.apache.james.jmap.rfc8621.contract.BlobCopyContract.{ALICE_ACCOUNT_ID, 
TEN_KILO_BYTES}
+import 
org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, 
ALICE, ALICE_PASSWORD, ANDRE, ANDRE_ACCOUNT_ID, ANDRE_PASSWORD, BOB, 
BOB_PASSWORD, DOMAIN, _2_DOT_DOMAIN, authScheme, baseRequestSpecBuilder, 
ACCOUNT_ID => BOB_ACCOUNT_ID}
+import org.apache.james.junit.categories.BasicFeature
+import org.apache.james.utils.DataProbeImpl
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.experimental.categories.Category
+import org.junit.jupiter.api.{BeforeEach, Test}
+
+object BlobCopyContract {
+  val TWENTY_KILO_BYTES_UPLOAD_QUOTA_LIMIT: String = "20K"
+  val TEN_KILO_BYTES: Array[Byte] = Strings.repeat("0123456789\r\n", 
853).getBytes(StandardCharsets.UTF_8)
+  val ALICE_ACCOUNT_ID: String = AccountId.from(ALICE).toOption.get.id.value
+}
+
+trait BlobCopyContract {
+  @BeforeEach
+  def setUp(server: GuiceJamesServer): Unit = {
+    server.getProbe(classOf[DataProbeImpl])
+      .fluent
+      .addDomain(DOMAIN.asString)
+      .addDomain(_2_DOT_DOMAIN.asString())
+      .addUser(BOB.asString, BOB_PASSWORD)
+      .addUser(ALICE.asString(), ALICE_PASSWORD)
+      .addUser(ANDRE.asString(), ANDRE_PASSWORD)
+
+    requestSpecification = baseRequestSpecBuilder(server)
+      .setAuth(authScheme(UserCredential(BOB, BOB_PASSWORD)))
+      .build
+  }
+
+  @Category(Array(classOf[BasicFeature]))
+  @Test
+  def shouldCopyBlobBetweenAccountsWhenDelegated(server: GuiceJamesServer): 
Unit = {
+    // Alice delegates Bob to access her account
+    server.getProbe(classOf[DataProbeImpl]).addAuthorizedUser(ALICE, BOB)
+
+    // Bob uploads a blob to his account
+    val bobBlobId: String = `given`
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .contentType(ContentType.BINARY)
+      .body(TEN_KILO_BYTES)
+    .when
+      .post(s"/upload/$BOB_ACCOUNT_ID")
+    .`then`
+      .statusCode(SC_CREATED)
+      .extract
+      .jsonPath()
+      .getString("blobId")
+
+    // Bob copies the blob from his account to Alice's account
+    val request: String =
+      s"""{
+         |  "using": [ "urn:ietf:params:jmap:core" ],
+         |  "methodCalls": [[
+         |    "Blob/copy",
+         |    {
+         |      "fromAccountId": "$BOB_ACCOUNT_ID",
+         |      "accountId": "$ALICE_ACCOUNT_ID",
+         |      "blobIds": [ "$bobBlobId" ]
+         |    },
+         |    "c1"]]
+         |}""".stripMargin
+
+    val copiedBlobId: String = `given`
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(request)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+      .contentType(ContentType.JSON)
+      .extract
+      .jsonPath()
+      .getString(s"methodResponses[0][1].copied.$bobBlobId")
+
+    // Alice downloads the copied blob from her account
+    val downloadResponse: Array[Byte] = `given`
+      .auth().basic(ALICE.asString(), ALICE_PASSWORD)
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+    .when
+      .get(s"/download/$ALICE_ACCOUNT_ID/$copiedBlobId")
+    .`then`
+      .statusCode(SC_OK)
+      .extract
+      .body
+      .asByteArray()
+
+    assertThat(new ByteArrayInputStream(downloadResponse))
+      .hasBinaryContent(TEN_KILO_BYTES)
+  }
+
+  @Test
+  def shouldCopyBlobFromAliceToBobWhenBobDelegated(server: GuiceJamesServer): 
Unit = {
+    // Alice delegates Bob to access her account
+    server.getProbe(classOf[DataProbeImpl]).addAuthorizedUser(ALICE, BOB)
+
+    // Alice uploads a blob to her account
+    val aliceBlobId: String = `given`
+      .auth().preemptive().basic(ALICE.asString(), ALICE_PASSWORD)
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .contentType(ContentType.BINARY)
+      .body(TEN_KILO_BYTES)
+    .when
+      .post(s"/upload/$ALICE_ACCOUNT_ID")
+    .`then`
+      .statusCode(SC_CREATED)
+      .extract
+      .jsonPath()
+      .getString("blobId")
+
+    // Bob copies the blob from Alice's account to his account
+    val request: String =
+      s"""{
+         |  "using": [ "urn:ietf:params:jmap:core" ],
+         |  "methodCalls": [[
+         |    "Blob/copy",
+         |    {
+         |      "fromAccountId": "$ALICE_ACCOUNT_ID",
+         |      "accountId": "$BOB_ACCOUNT_ID",
+         |      "blobIds": [ "$aliceBlobId" ]
+         |    },
+         |    "c1"]]
+         |}""".stripMargin
+
+    val copiedBlobId: String = `given`
+      .auth().preemptive().basic(BOB.asString(), BOB_PASSWORD)
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(request)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+      .contentType(ContentType.JSON)
+      .extract
+      .jsonPath()
+      .getString(s"methodResponses[0][1].copied.$aliceBlobId")
+
+    // Bob downloads the copied blob from his account
+    val downloadResponse: Array[Byte] = `given`
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+    .when
+      .get(s"/download/$BOB_ACCOUNT_ID/$copiedBlobId")
+    .`then`
+      .statusCode(SC_OK)
+      .extract
+      .body
+      .asByteArray()
+
+    assertThat(new ByteArrayInputStream(downloadResponse))
+      .hasBinaryContent(TEN_KILO_BYTES)
+  }
+
+  @Test
+  def transitiveDelegationShouldNotWorkForTargetAccountId(server: 
GuiceJamesServer): Unit = {
+    // Andre delegates to Alice; Alice delegates to Bob
+    server.getProbe(classOf[DataProbeImpl]).addAuthorizedUser(ANDRE, ALICE)
+    server.getProbe(classOf[DataProbeImpl]).addAuthorizedUser(ALICE, BOB)
+
+    // Bob uploads a blob to his account
+    val bobBlobId: String = `given`
+      .auth().preemptive().basic(BOB.asString(), BOB_PASSWORD)
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .contentType(ContentType.BINARY)
+      .body(TEN_KILO_BYTES)
+    .when
+      .post(s"/upload/$ALICE_ACCOUNT_ID")
+    .`then`
+      .statusCode(SC_CREATED)
+      .extract
+      .jsonPath()
+      .getString("blobId")
+
+    // Bob tries to copy from his account to Andre (transitive delegation 
should not work)
+    val request: String =
+      s"""{
+         |  "using": [ "urn:ietf:params:jmap:core" ],
+         |  "methodCalls": [[
+         |    "Blob/copy",
+         |    {
+         |      "fromAccountId": "$BOB_ACCOUNT_ID",
+         |      "accountId": "$ANDRE_ACCOUNT_ID",
+         |      "blobIds": [ "$bobBlobId" ]
+         |    },
+         |    "c1"]]
+         |}""".stripMargin
+
+    val response: String = `given`
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(request)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+      .contentType(ContentType.JSON)
+      .extract
+      .body
+      .asString
+
+    assertThatJson(response)
+      .inPath("methodResponses")
+      .isEqualTo(
+        """[["error",{"type":"accountNotFound"},"c1"]]""")
+  }
+
+  @Test
+  def transitiveDelegationShouldNotWorkForFromAccountId(server: 
GuiceJamesServer): Unit = {
+    // Andre delegates to Alice; Alice delegates to Bob
+    server.getProbe(classOf[DataProbeImpl]).addAuthorizedUser(ANDRE, ALICE)
+    server.getProbe(classOf[DataProbeImpl]).addAuthorizedUser(ALICE, BOB)
+
+    // Andre uploads a blob to his account
+    val andreBlobId: String = `given`
+      .auth().preemptive().basic(ANDRE.asString(), ANDRE_PASSWORD)
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .contentType(ContentType.BINARY)
+      .body(TEN_KILO_BYTES)
+    .when
+      .post(s"/upload/$ANDRE_ACCOUNT_ID")
+    .`then`
+      .statusCode(SC_CREATED)
+      .extract
+      .jsonPath()
+      .getString("blobId")
+
+    // Bob tries to copy blob from Andre to his account (transitive delegation 
should fail)
+    val request: String =
+      s"""{
+         |  "using": [ "urn:ietf:params:jmap:core" ],
+         |  "methodCalls": [[
+         |    "Blob/copy",
+         |    {
+         |      "fromAccountId": "$ANDRE_ACCOUNT_ID",
+         |      "accountId": "$BOB_ACCOUNT_ID",
+         |      "blobIds": [ "$andreBlobId" ]
+         |    },
+         |    "c1"]]
+         |}""".stripMargin
+
+    val response: String = `given`
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(request)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+      .contentType(ContentType.JSON)
+      .extract
+      .body
+      .asString
+
+    assertThatJson(response)
+      .inPath("methodResponses")
+      .isEqualTo(
+        """[["error",{"type":"fromAccountNotFound"},"c1"]]""")
+  }
+
+  @Test
+  def copyBlobShouldSucceedWhenUploadQuotaExceeded(): Unit = {
+    // Given upload quota is 20KB, upload 2 x 10KB blobs to reach the upload 
quota
+    `given`
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .contentType(ContentType.BINARY)
+      .body(TEN_KILO_BYTES)
+    .when
+      .post(s"/upload/$BOB_ACCOUNT_ID")
+    .`then`
+      .statusCode(SC_CREATED)
+      .extract
+      .jsonPath()
+      .getString("blobId")
+
+    val secondBlobId: String = `given`
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .contentType(ContentType.BINARY)
+      .body(TEN_KILO_BYTES)
+    .when
+      .post(s"/upload/$BOB_ACCOUNT_ID")
+    .`then`
+      .statusCode(SC_CREATED)
+      .extract
+      .jsonPath()
+      .getString("blobId")
+
+    // Upload quota should be reached, now copy 10KB blob to the same account
+    val request: String =
+      s"""{
+         |  "using": [ "urn:ietf:params:jmap:core" ],
+         |  "methodCalls": [[
+         |    "Blob/copy",
+         |    {
+         |      "fromAccountId": "$BOB_ACCOUNT_ID",
+         |      "accountId": "$BOB_ACCOUNT_ID",
+         |      "blobIds": [ "$secondBlobId" ]
+         |    },
+         |    "c1"]]
+         |}""".stripMargin
+
+    val copiedBlobId: String = `given`
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(request)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+      .contentType(ContentType.JSON)
+      .extract
+      .jsonPath()
+      .getString(s"methodResponses[0][1].copied.$secondBlobId")
+
+    // Download the copied blob should succeed
+    val downloadResponse: Array[Byte] = `given`
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+    .when
+      .get(s"/download/$BOB_ACCOUNT_ID/$copiedBlobId")
+    .`then`
+      .statusCode(SC_OK)
+      .extract
+      .body
+      .asByteArray()
+    assertThat(new ByteArrayInputStream(downloadResponse))
+      .hasBinaryContent(TEN_KILO_BYTES)
+  }
+
+  @Test
+  def shouldReturnNotFoundForNonExistingBlob(): Unit = {
+    val notFoundBlobId: String = "notFoundBlobId"
+
+    val request: String =
+      s"""{
+         |  "using": [ "urn:ietf:params:jmap:core" ],
+         |  "methodCalls": [[
+         |    "Blob/copy",
+         |    {
+         |      "fromAccountId": "$BOB_ACCOUNT_ID",
+         |      "accountId": "$BOB_ACCOUNT_ID",
+         |      "blobIds": [ "$notFoundBlobId" ]
+         |    },
+         |    "c1"]]
+         |}""".stripMargin
+
+    val response: String = `given`
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(request)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+      .contentType(ContentType.JSON)
+      .extract
+      .body
+      .asString
+
+    assertThatJson(response)
+      .inPath("methodResponses")
+      .isEqualTo(
+        s"""[[
+           |  "Blob/copy",
+           |  {
+           |    "fromAccountId":"$BOB_ACCOUNT_ID",
+           |    "accountId":"$BOB_ACCOUNT_ID",
+           |    "notCopied":{
+           |      "$notFoundBlobId":{
+           |        "type":"notFound",
+           |        "description":"Blob BlobId($notFoundBlobId) could not be 
found"
+           |      }
+           |    }
+           |  },
+           |  "c1"
+           |]]""".stripMargin)
+  }
+
+  @Test
+  def shouldReturnCopiedAndNotCopiedWhenMixingExistingAndMissingBlobs(): Unit 
= {
+    val existingBlobId: String = `given`
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .contentType(ContentType.BINARY)
+      .body(TEN_KILO_BYTES)
+    .when
+      .post(s"/upload/$BOB_ACCOUNT_ID")
+    .`then`
+      .statusCode(SC_CREATED)
+      .extract
+      .jsonPath()
+      .getString("blobId")
+
+    val notFoundBlobId: String = "notFoundBlobId"
+
+    val request: String =
+      s"""{
+         |  "using": [ "urn:ietf:params:jmap:core" ],
+         |  "methodCalls": [[
+         |    "Blob/copy",
+         |    {
+         |      "fromAccountId": "$BOB_ACCOUNT_ID",
+         |      "accountId": "$BOB_ACCOUNT_ID",
+         |      "blobIds": [ "$existingBlobId", "$notFoundBlobId" ]
+         |    },
+         |    "c1"]]
+         |}""".stripMargin
+
+    val response: String = `given`
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(request)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+      .contentType(ContentType.JSON)
+      .extract
+      .body
+      .asString
+
+    val copiedBlobId: String = 
JsonPath.from(response).getString(s"methodResponses[0][1].copied.$existingBlobId")
+
+    assertThatJson(response)
+      .inPath("methodResponses")
+      .isEqualTo(
+        s"""[[
+           |  "Blob/copy",
+           |  {
+           |    "fromAccountId":"$BOB_ACCOUNT_ID",
+           |    "accountId":"$BOB_ACCOUNT_ID",
+           |    "copied":{
+           |      "$existingBlobId":"$copiedBlobId"
+           |    },
+           |    "notCopied":{
+           |      "$notFoundBlobId":{
+           |        "type":"notFound",
+           |        "description":"Blob BlobId($notFoundBlobId) could not be 
found"
+           |      }
+           |    }
+           |  },
+           |  "c1"
+           |]]""".stripMargin)
+  }
+
+  @Test
+  def shouldReturnFromAccountNotFoundWhenFromAccountInvalid(): Unit = {
+    val request: String =
+      s"""{
+         |  "using": [ "urn:ietf:params:jmap:core" ],
+         |  "methodCalls": [[
+         |    "Blob/copy",
+         |    {
+         |      "fromAccountId": "unknownFromAccountId",
+         |      "accountId": "$BOB_ACCOUNT_ID",
+         |      "blobIds": [ "blobId" ]
+         |    },
+         |    "c1"]]
+         |}""".stripMargin
+
+    val response: String = `given`
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(request)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+      .contentType(ContentType.JSON)
+      .extract
+      .body
+      .asString
+
+    assertThatJson(response)
+      .inPath("methodResponses")
+      .isEqualTo(
+        """[["error",{"type":"fromAccountNotFound"},"c1"]]""")
+  }
+
+  @Category(Array(classOf[BasicFeature]))
+  @Test
+  def shouldFailWhenTargetAccountNotDelegated(): Unit = {
+    // Alice does NOT delegate her account to Bob
+
+    // upload a blob to Bob's account
+    val bobBlobId: String = `given`
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .contentType(ContentType.BINARY)
+      .body(TEN_KILO_BYTES)
+    .when
+      .post(s"/upload/$BOB_ACCOUNT_ID")
+    .`then`
+      .statusCode(SC_CREATED)
+      .extract
+      .jsonPath()
+      .getString("blobId")
+
+    // Bob tries to copy the blob from his account to Alice's account
+    val request: String =
+      s"""{
+         |  "using": [ "urn:ietf:params:jmap:core" ],
+         |  "methodCalls": [[
+         |    "Blob/copy",
+         |    {
+         |      "fromAccountId": "$BOB_ACCOUNT_ID",
+         |      "accountId": "$ALICE_ACCOUNT_ID",
+         |      "blobIds": [ "$bobBlobId" ]
+         |    },
+         |    "c1"]]
+         |}""".stripMargin
+
+    val response: String = `given`
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(request)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+      .contentType(ContentType.JSON)
+      .extract
+      .body
+      .asString
+
+    assertThatJson(response)
+      .inPath("methodResponses")
+      .isEqualTo(
+        """[["error",{"type":"accountNotFound"},"c1"]]""")
+  }
+
+  @Test
+  def shouldFailWhenAliceCopiesToBobWithoutDelegation(server: 
GuiceJamesServer): Unit = {
+    // Alice delegates Bob to access her account
+    server.getProbe(classOf[DataProbeImpl]).addAuthorizedUser(ALICE, BOB)
+
+    // Alice uploads a blob to her account
+    val aliceBlobId: String = `given`
+      .auth().preemptive().basic(ALICE.asString(), ALICE_PASSWORD)
+      .basePath("")
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .contentType(ContentType.BINARY)
+      .body(TEN_KILO_BYTES)
+    .when
+      .post(s"/upload/$ALICE_ACCOUNT_ID")
+    .`then`
+      .statusCode(SC_CREATED)
+      .extract
+      .jsonPath()
+      .getString("blobId")
+
+    // Alice tries to copy the blob to Bob's account (no delegation from Bob 
to Alice)
+    val request: String =
+      s"""{
+         |  "using": [ "urn:ietf:params:jmap:core" ],
+         |  "methodCalls": [[
+         |    "Blob/copy",
+         |    {
+         |      "fromAccountId": "$ALICE_ACCOUNT_ID",
+         |      "accountId": "$BOB_ACCOUNT_ID",
+         |      "blobIds": [ "$aliceBlobId" ]
+         |    },
+         |    "c1"]]
+         |}""".stripMargin
+
+    val response: String = `given`
+      .auth().preemptive().basic(ALICE.asString(), ALICE_PASSWORD)
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(request)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+      .contentType(ContentType.JSON)
+      .extract
+      .body
+      .asString
+
+    assertThatJson(response)
+      .inPath("methodResponses")
+      .isEqualTo(
+        """[["error",{"type":"accountNotFound"},"c1"]]""")
+  }
+}
diff --git 
a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryBlobCopyTest.java
 
b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryBlobCopyTest.java
new file mode 100644
index 0000000000..8f585fe9f8
--- /dev/null
+++ 
b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryBlobCopyTest.java
@@ -0,0 +1,49 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *  http://www.apache.org/licenses/LICENSE-2.0                  *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.rfc8621.memory;
+
+import static 
org.apache.james.data.UsersRepositoryModuleChooser.Implementation.DEFAULT;
+
+import org.apache.james.JamesServerBuilder;
+import org.apache.james.JamesServerExtension;
+import org.apache.james.MemoryJamesConfiguration;
+import org.apache.james.MemoryJamesServerMain;
+import org.apache.james.jmap.rfc8621.contract.BlobCopyContract;
+import org.apache.james.jmap.rfc8621.contract.BlobCopyContract$;
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbeModule;
+import org.apache.james.modules.TestJMAPServerModule;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableMap;
+
+public class MemoryBlobCopyTest implements BlobCopyContract {
+    @RegisterExtension
+    static JamesServerExtension testExtension = new 
JamesServerBuilder<MemoryJamesConfiguration>(tmpDir ->
+        MemoryJamesConfiguration.builder()
+            .workingDirectory(tmpDir)
+            .configurationFromClasspath()
+            .usersRepository(DEFAULT)
+            .enableJMAP()
+            .build())
+        .server(configuration -> 
MemoryJamesServerMain.createServer(configuration)
+            .overrideWith(new 
TestJMAPServerModule(ImmutableMap.of("upload.quota.limit", 
BlobCopyContract$.MODULE$.TWENTY_KILO_BYTES_UPLOAD_QUOTA_LIMIT())),
+                new DelegationProbeModule()))
+        .build();
+}
diff --git 
a/server/protocols/jmap-rfc-8621-integration-tests/postgres-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/postgres/PostgresBlobCopyTest.java
 
b/server/protocols/jmap-rfc-8621-integration-tests/postgres-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/postgres/PostgresBlobCopyTest.java
new file mode 100644
index 0000000000..8795e0593b
--- /dev/null
+++ 
b/server/protocols/jmap-rfc-8621-integration-tests/postgres-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/postgres/PostgresBlobCopyTest.java
@@ -0,0 +1,61 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *  http://www.apache.org/licenses/LICENSE-2.0                  *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.rfc8621.postgres;
+
+import static 
org.apache.james.data.UsersRepositoryModuleChooser.Implementation.DEFAULT;
+
+import org.apache.james.JamesServerBuilder;
+import org.apache.james.JamesServerExtension;
+import org.apache.james.PostgresJamesConfiguration;
+import org.apache.james.PostgresJamesServerMain;
+import org.apache.james.SearchConfiguration;
+import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.jmap.rfc8621.contract.BlobCopyContract;
+import org.apache.james.jmap.rfc8621.contract.BlobCopyContract$;
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbeModule;
+import org.apache.james.modules.RabbitMQExtension;
+import org.apache.james.modules.TestJMAPServerModule;
+import org.apache.james.modules.blobstore.BlobStoreConfiguration;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableMap;
+
+public class PostgresBlobCopyTest implements BlobCopyContract {
+    @RegisterExtension
+    static JamesServerExtension testExtension = new 
JamesServerBuilder<PostgresJamesConfiguration>(tmpDir ->
+        PostgresJamesConfiguration.builder()
+            .workingDirectory(tmpDir)
+            .configurationFromClasspath()
+            .searchConfiguration(SearchConfiguration.scanning())
+            .usersRepository(DEFAULT)
+            .eventBusImpl(PostgresJamesConfiguration.EventBusImpl.RABBITMQ)
+            .blobStore(BlobStoreConfiguration.builder()
+                .postgres()
+                .disableCache()
+                .deduplication()
+                .noCryptoConfig())
+            .build())
+        .extension(PostgresExtension.empty())
+        .extension(new RabbitMQExtension())
+        .server(configuration -> 
PostgresJamesServerMain.createServer(configuration)
+            .overrideWith(new 
TestJMAPServerModule(ImmutableMap.of("upload.quota.limit", 
BlobCopyContract$.MODULE$.TWENTY_KILO_BYTES_UPLOAD_QUOTA_LIMIT())))
+            .overrideWith(new DelegationProbeModule()))
+        .build();
+}
diff --git a/server/protocols/jmap-rfc-8621/doc/specs/spec/jmap/binary.mdown 
b/server/protocols/jmap-rfc-8621/doc/specs/spec/jmap/binary.mdown
index a93efd8f84..7b8ccdf017 100644
--- a/server/protocols/jmap-rfc-8621/doc/specs/spec/jmap/binary.mdown
+++ b/server/protocols/jmap-rfc-8621/doc/specs/spec/jmap/binary.mdown
@@ -67,8 +67,8 @@ When an HTTP error response is returned to the client, the 
server SHOULD return
 
 ## Blob/copy
 
-> :warning:
-> Not implemented.
+> :information_source:
+> Implemented
 
 Binary data may be copied **between** two different accounts using the 
*Blob/copy* method rather than having to download and then reupload on the 
client.
 
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/BlobCopy.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/BlobCopy.scala
new file mode 100644
index 0000000000..1be3fa46fc
--- /dev/null
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/BlobCopy.scala
@@ -0,0 +1,40 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.core
+
+import org.apache.james.jmap.mail.{BlobId, BlobIds, RequestTooLargeException}
+import org.apache.james.jmap.method.{ValidableRequest, WithAccountId}
+
+case class BlobCopyRequest(fromAccountId: AccountId,
+                           accountId: AccountId,
+                           blobIds: BlobIds) extends WithAccountId with 
ValidableRequest {
+  override def validate(configuration: JmapRfc8621Configuration): 
Either[Exception, BlobCopyRequest] =
+    if (blobIds.value.size > configuration.maxObjectsInSet.value.value) {
+      Left(RequestTooLargeException(s"""Too many items in a Blob/copy request.
+        Got ${blobIds.value.size} items instead of maximum 
${configuration.maxObjectsInSet.value.value}."""))
+    } else {
+      scala.Right(this)
+    }
+}
+
+case class BlobCopyResponse(fromAccountId: AccountId,
+                            accountId: AccountId,
+                            copied: Option[Map[BlobId, BlobId]],
+                            notCopied: Option[Map[BlobId, SetError]])
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/Invocation.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/Invocation.scala
index 8372fba09b..1d06a42f60 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/Invocation.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/Invocation.scala
@@ -70,6 +70,10 @@ object ErrorCode {
     override def code: String = "accountNotFound"
   }
 
+  case object FromAccountNotFound extends ErrorCode {
+    override def code: String = "fromAccountNotFound"
+  }
+
   case object Forbidden extends ErrorCode {
     override def code: String = "forbidden"
   }
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/BlobCopySerializer.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/BlobCopySerializer.scala
new file mode 100644
index 0000000000..bc495956fe
--- /dev/null
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/BlobCopySerializer.scala
@@ -0,0 +1,41 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *  http://www.apache.org/licenses/LICENSE-2.0                  *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.json
+
+import org.apache.james.jmap.core.{BlobCopyRequest, BlobCopyResponse, SetError}
+import org.apache.james.jmap.mail.{BlobId, BlobIds}
+import play.api.libs.json.{JsObject, JsResult, Json, OWrites, Reads, Writes}
+
+class BlobCopySerializer {
+  private implicit val blobIdWrites: Writes[BlobId] = Json.valueWrites[BlobId]
+  private implicit val blobIdsReads: Reads[BlobIds] = Json.valueReads[BlobIds]
+
+  private implicit val copiedWrites: Writes[Map[BlobId, BlobId]] =
+    mapWrites[BlobId, BlobId](_.value.value, blobIdWrites)
+  private implicit val notCopiedWrites: Writes[Map[BlobId, SetError]] =
+    mapWrites[BlobId, SetError](_.value.value, setErrorWrites)
+
+  private implicit val blobCopyRequestReads: Reads[BlobCopyRequest] = 
Json.reads[BlobCopyRequest]
+  private implicit val blobCopyResponseWrites: OWrites[BlobCopyResponse] = 
Json.writes[BlobCopyResponse]
+
+  def deserializeBlobCopyRequest(input: JsObject): JsResult[BlobCopyRequest] = 
Json.fromJson[BlobCopyRequest](input)
+
+  def serializeBlobCopyResponse(response: BlobCopyResponse): JsObject = 
Json.toJsObject(response)
+}
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/BlobCopyMethod.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/BlobCopyMethod.scala
new file mode 100644
index 0000000000..b5bf6ed4f6
--- /dev/null
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/BlobCopyMethod.scala
@@ -0,0 +1,148 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *  http://www.apache.org/licenses/LICENSE-2.0                  *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.method
+
+import eu.timepit.refined.auto._
+import jakarta.inject.Inject
+import org.apache.james.core.Username
+import org.apache.james.jmap.api.upload.UploadService
+import org.apache.james.jmap.core.CapabilityIdentifier.{CapabilityIdentifier, 
JMAP_CORE}
+import org.apache.james.jmap.core.Invocation.{Arguments, MethodName}
+import org.apache.james.jmap.core.SetError.SetErrorDescription
+import org.apache.james.jmap.core.{BlobCopyRequest, BlobCopyResponse, 
ErrorCode, Invocation, JmapRfc8621Configuration, SessionTranslator, SetError}
+import org.apache.james.jmap.json.BlobCopySerializer
+import org.apache.james.jmap.mail.BlobId
+import org.apache.james.jmap.mail.MDNParse.UnparsedBlobId
+import org.apache.james.jmap.routes.UploadRoutes.asBlobId
+import org.apache.james.jmap.routes.{Blob, BlobNotFoundException, 
BlobResolvers, SessionSupplier}
+import org.apache.james.mailbox.{MailboxSession, SessionProvider}
+import org.apache.james.metrics.api.MetricFactory
+import org.apache.james.util.ReactorUtils
+import org.reactivestreams.Publisher
+import org.slf4j.{Logger, LoggerFactory}
+import reactor.core.publisher.Mono
+import reactor.core.scala.publisher.{SFlux, SMono}
+
+sealed trait CopyResult {
+  def sourceBlobId: BlobId
+}
+case class Copied(sourceBlobId: BlobId, copied: BlobId) extends CopyResult
+case class NotCopied(sourceBlobId: BlobId, error: SetError) extends CopyResult
+case class CopyResults(results: Seq[CopyResult]) {
+  def copied: Option[Map[BlobId, BlobId]] =
+    Option(results.collect { case Copied(source, copied) => source -> copied 
}.toMap).filter(_.nonEmpty)
+
+  def notCopied: Option[Map[BlobId, SetError]] =
+    Option(results.collect { case NotCopied(source, error) => source -> error 
}.toMap).filter(_.nonEmpty)
+}
+
+case class FromAccountNotFoundException() extends RuntimeException
+
+class BlobCopyMethod @Inject()(val metricFactory: MetricFactory,
+                               val sessionSupplier: SessionSupplier,
+                               val sessionTranslator: SessionTranslator,
+                               val sessionProvider: SessionProvider,
+                               val blobResolvers: BlobResolvers,
+                               val uploadService: UploadService,
+                               val serializer: BlobCopySerializer,
+                               val configuration: JmapRfc8621Configuration) 
extends MethodRequiringAccountId[BlobCopyRequest] {
+  private val LOGGER: Logger = LoggerFactory.getLogger(classOf[BlobCopyMethod])
+
+  override val methodName: MethodName = MethodName("Blob/copy")
+  override val requiredCapabilities: Set[CapabilityIdentifier] = Set(JMAP_CORE)
+
+  override def doProcess(capabilities: Set[CapabilityIdentifier], invocation: 
InvocationWithContext, mailboxSession: MailboxSession, request: 
BlobCopyRequest): Publisher[InvocationWithContext] =
+    resolveSourceSession(request, mailboxSession)
+      .flatMap(sourceSession => copyBlobs(request, sourceSession, 
targetSession = mailboxSession))
+      .map(response => asInvocation(response, invocation))
+      .onErrorResume {
+        case _: FromAccountNotFoundException =>
+          SMono.just(asErrorInvocation(ErrorCode.FromAccountNotFound, 
invocation))
+        case e =>
+          LOGGER.error("Failed to copy blob", e)
+          SMono.just(asErrorInvocation(ErrorCode.ServerFail, e.getMessage, 
invocation))
+      }
+
+  private def resolveSourceSession(request: BlobCopyRequest, mailboxSession: 
MailboxSession): SMono[MailboxSession] =
+    if (request.fromAccountId.equals(request.accountId)) {
+      SMono.just(mailboxSession)
+    } else {
+      SMono(Mono.justOrEmpty(mailboxSession.getLoggedInUser))
+        .flatMap(createLoggedInUserSession)
+        .flatMap(session => sessionTranslator.delegateIfNeeded(session, 
request.fromAccountId)
+          .onErrorResume { case _: AccountNotFoundException => 
SMono.error(FromAccountNotFoundException())})
+        .switchIfEmpty(SMono.error(FromAccountNotFoundException()))
+    }
+
+  private def createLoggedInUserSession(loggedInUser: Username): 
SMono[MailboxSession] =
+    SMono.fromCallable(() => 
sessionProvider.authenticate(loggedInUser).withoutDelegation())
+      .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+
+  private def asInvocation(response: BlobCopyResponse, invocation: 
InvocationWithContext): InvocationWithContext =
+    InvocationWithContext(
+      Invocation(
+        methodName = methodName,
+        arguments = Arguments(serializer.serializeBlobCopyResponse(response)),
+        methodCallId = invocation.invocation.methodCallId),
+      invocation.processingContext)
+
+  private def asErrorInvocation(errorCode: ErrorCode, invocation: 
InvocationWithContext): InvocationWithContext =
+    InvocationWithContext(
+      Invocation.error(errorCode, invocation.invocation.methodCallId),
+      invocation.processingContext)
+
+  private def asErrorInvocation(errorCode: ErrorCode, description: String = 
"", invocation: InvocationWithContext): InvocationWithContext =
+    InvocationWithContext(
+      Invocation.error(errorCode, description, 
invocation.invocation.methodCallId),
+      invocation.processingContext)
+
+  override def getRequest(mailboxSession: MailboxSession, invocation: 
Invocation): Either[Exception, BlobCopyRequest] =
+    
serializer.deserializeBlobCopyRequest(invocation.arguments.value).asEitherRequest
+      .flatMap(request => request.validate(configuration).map(_ => request))
+
+  private def copyBlobs(request: BlobCopyRequest, sourceSession: 
MailboxSession, targetSession: MailboxSession): SMono[BlobCopyResponse] =
+    SFlux.fromIterable(request.blobIds.value)
+      .flatMap(blobId => copyBlob(blobId, sourceSession, targetSession), 
ReactorUtils.DEFAULT_CONCURRENCY)
+      .collectSeq()
+      .map(CopyResults)
+      .map(results => BlobCopyResponse(request.fromAccountId, 
request.accountId, results.copied, results.notCopied))
+
+  private def copyBlob(unparsedBlobId: UnparsedBlobId, sourceSession: 
MailboxSession, targetSession: MailboxSession): SMono[CopyResult] =
+    SMono.fromTry(BlobId.of(unparsedBlobId))
+      .flatMap { sourceBlobId =>
+        blobResolvers.resolve(sourceBlobId, sourceSession)
+          .flatMap(blob => uploadBlob(sourceBlobId, blob, targetSession))
+          .onErrorResume(e => SMono.just(NotCopied(sourceBlobId, 
asSetError(sourceBlobId, sourceSession, targetSession, e))))
+      }
+
+  private def uploadBlob(sourceBlobId: BlobId, blob: Blob, targetSession: 
MailboxSession): SMono[CopyResult] =
+    SMono.fromPublisher(uploadService.upload(blob.content, blob.contentType, 
targetSession.getUser))
+      .map(upload => Copied(sourceBlobId, asBlobId(upload.uploadId)))
+
+  private def asSetError(sourceId: BlobId, sourceSession: MailboxSession, 
targetSession: MailboxSession, throwable: Throwable): SetError =
+    throwable match {
+      case e: BlobNotFoundException =>
+        LOGGER.info(s"Could not copy blob as ${e.blobId} is not found")
+        SetError.notFound(SetErrorDescription(s"Blob ${e.blobId} could not be 
found"))
+      case _ =>
+        LOGGER.error(s"Failed to copy blob $sourceId from account 
${sourceSession.getUser.asString()} to account 
${targetSession.getUser.asString()}", throwable)
+        SetError.serverFail(SetErrorDescription(throwable.getMessage))
+    }
+}
diff --git 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
index 45f18fa67c..1d3a458351 100644
--- 
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
+++ 
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
@@ -26,8 +26,8 @@ import java.util.stream
 import java.util.stream.Stream
 
 import io.netty.handler.codec.http.HttpHeaderNames.{CONTENT_LENGTH, 
CONTENT_TYPE}
+import io.netty.handler.codec.http.HttpMethod
 import io.netty.handler.codec.http.HttpResponseStatus.{BAD_REQUEST, CREATED, 
FORBIDDEN, INTERNAL_SERVER_ERROR, UNAUTHORIZED}
-import io.netty.handler.codec.http.{HttpMethod, HttpResponseStatus}
 import jakarta.inject.{Inject, Named}
 import org.apache.commons.fileupload.util.LimitedInputStream
 import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
@@ -42,7 +42,7 @@ import org.apache.james.jmap.http.rfc8621.InjectionKeys
 import org.apache.james.jmap.json.{ResponseSerializer, UploadSerializer}
 import org.apache.james.jmap.mail.BlobId
 import org.apache.james.jmap.method.AccountNotFoundException
-import org.apache.james.jmap.routes.UploadRoutes.LOGGER
+import org.apache.james.jmap.routes.UploadRoutes.{LOGGER, asBlobId}
 import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes}
 import org.apache.james.mailbox.MailboxSession
 import org.apache.james.mailbox.model.ContentType
@@ -57,6 +57,9 @@ case class TooBigUploadException() extends RuntimeException
 
 object UploadRoutes {
   val LOGGER: Logger = LoggerFactory.getLogger(classOf[UploadRoutes])
+
+  def asBlobId(uploadId: UploadId): BlobId =
+    BlobId.of(s"uploads-${uploadId.asString()}").get
 }
 
 case class UploadResponse(accountId: AccountId,
@@ -160,8 +163,6 @@ class UploadRoutes @Inject()(@Named(InjectionKeys.RFC_8621) 
val authenticator: A
         size = uploadMetaData.size,
         accountId = accountId)
 
-  private def asBlobId(uploadId: UploadId): BlobId = 
BlobId.of(s"uploads-${uploadId.asString()}" ).get
-
   private def respondDetails(httpServerResponse: HttpServerResponse, details: 
ProblemDetails): SMono[Void] =
     SMono.fromCallable(() => ResponseSerializer.serialize(details).toString)
       .map(_.getBytes(StandardCharsets.UTF_8))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to