This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/postgresql by this push:
     new 12278b9357 JAMES-2586 Fix postgres mailbox acl concurrency issue 
(#2551)
12278b9357 is described below

commit 12278b935782070293881f3ec0379a5bd9ae6dd0
Author: hungphan227 <[email protected]>
AuthorDate: Tue Dec 10 17:14:43 2024 +0700

    JAMES-2586 Fix postgres mailbox acl concurrency issue (#2551)
    
    Co-authored-by: hung phan <[email protected]>
---
 .../postgres/mail/PostgresACLUpsertException.java} | 16 ++------
 .../postgres/mail/PostgresMailboxMapper.java       | 30 +++++++-------
 .../postgres/mail/PostgresMailboxModule.java       |  2 +
 .../postgres/mail/dao/PostgresMailboxDAO.java      | 25 +++++++++++-
 .../mail/PostgresMailboxMapperACLTest.java         | 47 +++++++++++++++++++++-
 .../store/mail/model/MailboxMapperACLTest.java     |  2 +-
 6 files changed, 91 insertions(+), 31 deletions(-)

diff --git 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresACLUpsertException.java
similarity index 63%
copy from 
mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
copy to 
mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresACLUpsertException.java
index 9f700ac804..f87e8090ac 100644
--- 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresACLUpsertException.java
@@ -19,18 +19,8 @@
 
 package org.apache.james.mailbox.postgres.mail;
 
-import org.apache.james.backends.postgres.PostgresExtension;
-import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
-import org.apache.james.mailbox.store.mail.MailboxMapper;
-import org.apache.james.mailbox.store.mail.model.MailboxMapperACLTest;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-class PostgresMailboxMapperACLTest extends MailboxMapperACLTest {
-    @RegisterExtension
-    static PostgresExtension postgresExtension = 
PostgresExtension.withoutRowLevelSecurity(PostgresMailboxModule.MODULE);
-
-    @Override
-    protected MailboxMapper createMailboxMapper() {
-        return new PostgresMailboxMapper(new 
PostgresMailboxDAO(postgresExtension.getDefaultPostgresExecutor()));
+public class PostgresACLUpsertException extends RuntimeException {
+    public PostgresACLUpsertException(String message) {
+        super(message);
     }
 }
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapper.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapper.java
index 0974c0529e..e68bb3f185 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapper.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapper.java
@@ -19,10 +19,12 @@
 
 package org.apache.james.mailbox.postgres.mail;
 
+import java.time.Duration;
 import java.util.function.Function;
 
 import org.apache.james.core.Username;
 import org.apache.james.mailbox.acl.ACLDiff;
+import org.apache.james.mailbox.exception.UnsupportedRightException;
 import org.apache.james.mailbox.model.Mailbox;
 import org.apache.james.mailbox.model.MailboxACL;
 import org.apache.james.mailbox.model.MailboxId;
@@ -32,10 +34,9 @@ import org.apache.james.mailbox.model.search.MailboxQuery;
 import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
 import org.apache.james.mailbox.store.mail.MailboxMapper;
 
-import com.github.fge.lambdas.Throwing;
-
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
 
 public class PostgresMailboxMapper implements MailboxMapper {
     private final PostgresMailboxDAO postgresMailboxDAO;
@@ -96,21 +97,22 @@ public class PostgresMailboxMapper implements MailboxMapper 
{
 
     @Override
     public Mono<ACLDiff> updateACL(Mailbox mailbox, MailboxACL.ACLCommand 
mailboxACLCommand) {
-        return upsertACL(mailbox,
-            mailbox.getACL(),
-            Throwing.supplier(() -> 
mailbox.getACL().apply(mailboxACLCommand)).get());
+        return postgresMailboxDAO.getACL(mailbox.getMailboxId())
+            .flatMap(pairMailboxACLAndVersion -> {
+                try {
+                    MailboxACL newACL = 
pairMailboxACLAndVersion.getLeft().apply(mailboxACLCommand);
+                    return 
postgresMailboxDAO.upsertACL(mailbox.getMailboxId(), newACL, 
pairMailboxACLAndVersion.getRight())
+                        
.thenReturn(ACLDiff.computeDiff(pairMailboxACLAndVersion.getLeft(), newACL));
+                } catch (UnsupportedRightException e) {
+                    throw new RuntimeException(e);
+                }
+            }).retryWhen(Retry.backoff(3, Duration.ofMillis(100))
+                .filter(throwable -> throwable instanceof 
PostgresACLUpsertException));
     }
 
     @Override
     public Mono<ACLDiff> setACL(Mailbox mailbox, MailboxACL mailboxACL) {
-        return upsertACL(mailbox, mailbox.getACL(), mailboxACL);
-    }
-
-    private Mono<ACLDiff> upsertACL(Mailbox mailbox, MailboxACL oldACL, 
MailboxACL newACL) {
-        return postgresMailboxDAO.upsertACL(mailbox.getMailboxId(), newACL)
-            .then(Mono.fromCallable(() -> {
-                mailbox.setACL(newACL);
-                return ACLDiff.computeDiff(oldACL, newACL);
-            }));
+        return postgresMailboxDAO.upsertACL(mailbox.getMailboxId(), mailboxACL)
+            .thenReturn(ACLDiff.computeDiff(mailbox.getACL(), mailboxACL));
     }
 }
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
index 5b17924d01..0c302964a9 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
@@ -47,6 +47,7 @@ public interface PostgresMailboxModule {
         Field<Long> MAILBOX_LAST_UID = DSL.field("mailbox_last_uid", BIGINT);
         Field<Long> MAILBOX_HIGHEST_MODSEQ = 
DSL.field("mailbox_highest_modseq", BIGINT);
         Field<Hstore> MAILBOX_ACL = DSL.field("mailbox_acl", 
org.jooq.impl.DefaultDataType.getDefaultDataType("hstore").asConvertedDataType(new
 HstoreBinding()));
+        Field<Long> MAILBOX_ACL_VERSION = DSL.field("mailbox_acl_version", 
BIGINT.notNull().defaultValue(DSL.field("0", BIGINT)));
 
         Name MAILBOX_NAME_USER_NAME_NAMESPACE_UNIQUE_CONSTRAINT = 
DSL.name("mailbox_mailbox_name_user_name_mailbox_namespace_key");
 
@@ -60,6 +61,7 @@ public interface PostgresMailboxModule {
                 .column(MAILBOX_LAST_UID)
                 .column(MAILBOX_HIGHEST_MODSEQ)
                 .column(MAILBOX_ACL)
+                .column(MAILBOX_ACL_VERSION)
                 .constraint(DSL.primaryKey(MAILBOX_ID))
                 
.constraint(DSL.constraint(MAILBOX_NAME_USER_NAME_NAMESPACE_UNIQUE_CONSTRAINT).unique(MAILBOX_NAME,
 USER_NAME, MAILBOX_NAMESPACE))))
             .supportsRowLevelSecurity()
diff --git 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
index 616e4bd1e2..a8bd57b8ba 100644
--- 
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
+++ 
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
@@ -20,6 +20,7 @@
 package org.apache.james.mailbox.postgres.mail.dao;
 
 import static 
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_ACL;
+import static 
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_ACL_VERSION;
 import static 
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_HIGHEST_MODSEQ;
 import static 
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_ID;
 import static 
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_LAST_UID;
@@ -55,6 +56,7 @@ import org.apache.james.mailbox.model.UidValidity;
 import org.apache.james.mailbox.model.search.MailboxQuery;
 import org.apache.james.mailbox.model.search.Wildcard;
 import org.apache.james.mailbox.postgres.PostgresMailboxId;
+import org.apache.james.mailbox.postgres.mail.PostgresACLUpsertException;
 import org.apache.james.mailbox.postgres.mail.PostgresMailbox;
 import org.apache.james.mailbox.store.MailboxExpressionBackwardCompatibility;
 import org.jooq.Condition;
@@ -150,15 +152,34 @@ public class PostgresMailboxDAO {
             .switchIfEmpty(Mono.error(new 
MailboxNotFoundException(mailbox.getMailboxId())));
     }
 
+    public Mono<Void> upsertACL(MailboxId mailboxId, MailboxACL acl, Long 
currentAclVersion) {
+        return postgresExecutor.executeReturnAffectedRowsCount(dslContext -> 
Mono.from(dslContext.update(TABLE_NAME)
+                .set(MAILBOX_ACL, MAILBOX_ACL_TO_HSTORE_FUNCTION.apply(acl))
+                .set(MAILBOX_ACL_VERSION, currentAclVersion + 1)
+                .where(MAILBOX_ID.eq(((PostgresMailboxId) mailboxId).asUuid()))
+                .and(MAILBOX_ACL_VERSION.eq(currentAclVersion))))
+            .filter(count -> count > 0)
+            .switchIfEmpty(Mono.error(new PostgresACLUpsertException("Upsert 
mailbox acl failed with mailboxId " + mailboxId.serialize())))
+            .then();
+    }
+
     public Mono<Void> upsertACL(MailboxId mailboxId, MailboxACL acl) {
         return postgresExecutor.executeReturnAffectedRowsCount(dslContext -> 
Mono.from(dslContext.update(TABLE_NAME)
-            .set(MAILBOX_ACL, MAILBOX_ACL_TO_HSTORE_FUNCTION.apply(acl))
-            .where(MAILBOX_ID.eq(((PostgresMailboxId) mailboxId).asUuid()))))
+                .set(MAILBOX_ACL, MAILBOX_ACL_TO_HSTORE_FUNCTION.apply(acl))
+                .set(DSL.field(MAILBOX_ACL_VERSION.getName()), (Object) 
DSL.field(MAILBOX_ACL_VERSION.getName() + " + 1"))
+                .where(MAILBOX_ID.eq(((PostgresMailboxId) 
mailboxId).asUuid()))))
             .filter(count -> count > 0)
             .switchIfEmpty(Mono.error(new RuntimeException("Upsert mailbox acl 
failed with mailboxId " + mailboxId.serialize())))
             .then();
     }
 
+    public Mono<Pair<MailboxACL, Long>> getACL(MailboxId mailboxId) {
+        return postgresExecutor.executeRow(dsl -> 
Mono.from(dsl.select(MAILBOX_ACL, MAILBOX_ACL_VERSION)
+                .from(TABLE_NAME)
+                .where(MAILBOX_ID.eq(((PostgresMailboxId) 
mailboxId).asUuid()))))
+            .map(record -> 
Pair.of(Optional.ofNullable(record.get(MAILBOX_ACL)).map(HSTORE_TO_MAILBOX_ACL_FUNCTION).orElse(new
 MailboxACL()), record.get(MAILBOX_ACL_VERSION)));
+    }
+
     public Flux<PostgresMailbox> findMailboxesByUsername(Username userName) {
         return postgresExecutor.executeRows(dslContext -> 
Flux.from(dslContext.select(MAILBOX_ID,
                     MAILBOX_NAME,
diff --git 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
index 9f700ac804..67f44e77ed 100644
--- 
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
+++ 
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
@@ -19,18 +19,63 @@
 
 package org.apache.james.mailbox.postgres.mail;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+
 import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.mailbox.model.MailboxACL;
 import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
 import org.apache.james.mailbox.store.mail.MailboxMapper;
 import org.apache.james.mailbox.store.mail.model.MailboxMapperACLTest;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import com.google.common.collect.ImmutableMap;
+
 class PostgresMailboxMapperACLTest extends MailboxMapperACLTest {
     @RegisterExtension
     static PostgresExtension postgresExtension = 
PostgresExtension.withoutRowLevelSecurity(PostgresMailboxModule.MODULE);
 
+    private PostgresMailboxMapper mailboxMapper;
+
     @Override
     protected MailboxMapper createMailboxMapper() {
-        return new PostgresMailboxMapper(new 
PostgresMailboxDAO(postgresExtension.getDefaultPostgresExecutor()));
+        mailboxMapper = new PostgresMailboxMapper(new 
PostgresMailboxDAO(postgresExtension.getDefaultPostgresExecutor()));
+        return mailboxMapper;
+    }
+
+    @Test
+    protected void updateAclShouldWorkWellInMultiThreadEnv() throws 
ExecutionException, InterruptedException {
+        MailboxACL.Rfc4314Rights rights = new 
MailboxACL.Rfc4314Rights(MailboxACL.Right.Administer, MailboxACL.Right.Write);
+        MailboxACL.Rfc4314Rights newRights = new 
MailboxACL.Rfc4314Rights(MailboxACL.Right.Write);
+
+        ConcurrentTestRunner.builder()
+            .reactorOperation((threadNumber, step) -> {
+                int userNumber = threadNumber / 2;
+                MailboxACL.EntryKey key = 
MailboxACL.EntryKey.createUserEntryKey("user" + userNumber);
+                if (threadNumber % 2 == 0) {
+                    return mailboxMapper.updateACL(benwaInboxMailbox, 
MailboxACL.command().key(key).rights(rights).asReplacement())
+                        .then();
+                } else {
+                    return mailboxMapper.updateACL(benwaInboxMailbox, 
MailboxACL.command().key(key).rights(newRights).asAddition())
+                        .then();
+                }
+            })
+            .threadCount(10)
+            .operationCount(1)
+            .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+        MailboxACL expectedMailboxACL = new MailboxACL(IntStream.range(0, 
5).boxed()
+            .collect(ImmutableMap.toImmutableMap(userNumber -> 
MailboxACL.EntryKey.createUserEntryKey("user" + userNumber), userNumber -> 
rights)));
+
+        assertThat(
+            mailboxMapper.findMailboxById(benwaInboxMailbox.getMailboxId())
+                .block()
+                .getACL())
+            .isEqualTo(expectedMailboxACL);
     }
 }
diff --git 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
index 88e87684a1..6e2f94a90f 100644
--- 
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
+++ 
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
@@ -43,7 +43,7 @@ public abstract class MailboxMapperACLTest {
     private static final Username USER_1 = Username.of("user1");
     private static final Username USER_2 = Username.of("user2");
 
-    private Mailbox benwaInboxMailbox;
+    protected Mailbox benwaInboxMailbox;
 
     private MailboxMapper mailboxMapper;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to