This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/postgresql by this push:
new 12278b9357 JAMES-2586 Fix postgres mailbox acl concurrency issue
(#2551)
12278b9357 is described below
commit 12278b935782070293881f3ec0379a5bd9ae6dd0
Author: hungphan227 <[email protected]>
AuthorDate: Tue Dec 10 17:14:43 2024 +0700
JAMES-2586 Fix postgres mailbox acl concurrency issue (#2551)
Co-authored-by: hung phan <[email protected]>
---
.../postgres/mail/PostgresACLUpsertException.java} | 16 ++------
.../postgres/mail/PostgresMailboxMapper.java | 30 +++++++-------
.../postgres/mail/PostgresMailboxModule.java | 2 +
.../postgres/mail/dao/PostgresMailboxDAO.java | 25 +++++++++++-
.../mail/PostgresMailboxMapperACLTest.java | 47 +++++++++++++++++++++-
.../store/mail/model/MailboxMapperACLTest.java | 2 +-
6 files changed, 91 insertions(+), 31 deletions(-)
diff --git
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresACLUpsertException.java
similarity index 63%
copy from
mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
copy to
mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresACLUpsertException.java
index 9f700ac804..f87e8090ac 100644
---
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
+++
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresACLUpsertException.java
@@ -19,18 +19,8 @@
package org.apache.james.mailbox.postgres.mail;
-import org.apache.james.backends.postgres.PostgresExtension;
-import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
-import org.apache.james.mailbox.store.mail.MailboxMapper;
-import org.apache.james.mailbox.store.mail.model.MailboxMapperACLTest;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-class PostgresMailboxMapperACLTest extends MailboxMapperACLTest {
- @RegisterExtension
- static PostgresExtension postgresExtension =
PostgresExtension.withoutRowLevelSecurity(PostgresMailboxModule.MODULE);
-
- @Override
- protected MailboxMapper createMailboxMapper() {
- return new PostgresMailboxMapper(new
PostgresMailboxDAO(postgresExtension.getDefaultPostgresExecutor()));
+public class PostgresACLUpsertException extends RuntimeException {
+ public PostgresACLUpsertException(String message) {
+ super(message);
}
}
diff --git
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapper.java
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapper.java
index 0974c0529e..e68bb3f185 100644
---
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapper.java
+++
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapper.java
@@ -19,10 +19,12 @@
package org.apache.james.mailbox.postgres.mail;
+import java.time.Duration;
import java.util.function.Function;
import org.apache.james.core.Username;
import org.apache.james.mailbox.acl.ACLDiff;
+import org.apache.james.mailbox.exception.UnsupportedRightException;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxACL;
import org.apache.james.mailbox.model.MailboxId;
@@ -32,10 +34,9 @@ import org.apache.james.mailbox.model.search.MailboxQuery;
import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
import org.apache.james.mailbox.store.mail.MailboxMapper;
-import com.github.fge.lambdas.Throwing;
-
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
public class PostgresMailboxMapper implements MailboxMapper {
private final PostgresMailboxDAO postgresMailboxDAO;
@@ -96,21 +97,22 @@ public class PostgresMailboxMapper implements MailboxMapper
{
@Override
public Mono<ACLDiff> updateACL(Mailbox mailbox, MailboxACL.ACLCommand
mailboxACLCommand) {
- return upsertACL(mailbox,
- mailbox.getACL(),
- Throwing.supplier(() ->
mailbox.getACL().apply(mailboxACLCommand)).get());
+ return postgresMailboxDAO.getACL(mailbox.getMailboxId())
+ .flatMap(pairMailboxACLAndVersion -> {
+ try {
+ MailboxACL newACL =
pairMailboxACLAndVersion.getLeft().apply(mailboxACLCommand);
+ return
postgresMailboxDAO.upsertACL(mailbox.getMailboxId(), newACL,
pairMailboxACLAndVersion.getRight())
+
.thenReturn(ACLDiff.computeDiff(pairMailboxACLAndVersion.getLeft(), newACL));
+ } catch (UnsupportedRightException e) {
+ throw new RuntimeException(e);
+ }
+ }).retryWhen(Retry.backoff(3, Duration.ofMillis(100))
+ .filter(throwable -> throwable instanceof
PostgresACLUpsertException));
}
@Override
public Mono<ACLDiff> setACL(Mailbox mailbox, MailboxACL mailboxACL) {
- return upsertACL(mailbox, mailbox.getACL(), mailboxACL);
- }
-
- private Mono<ACLDiff> upsertACL(Mailbox mailbox, MailboxACL oldACL,
MailboxACL newACL) {
- return postgresMailboxDAO.upsertACL(mailbox.getMailboxId(), newACL)
- .then(Mono.fromCallable(() -> {
- mailbox.setACL(newACL);
- return ACLDiff.computeDiff(oldACL, newACL);
- }));
+ return postgresMailboxDAO.upsertACL(mailbox.getMailboxId(), mailboxACL)
+ .thenReturn(ACLDiff.computeDiff(mailbox.getACL(), mailboxACL));
}
}
diff --git
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
index 5b17924d01..0c302964a9 100644
---
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
+++
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxModule.java
@@ -47,6 +47,7 @@ public interface PostgresMailboxModule {
Field<Long> MAILBOX_LAST_UID = DSL.field("mailbox_last_uid", BIGINT);
Field<Long> MAILBOX_HIGHEST_MODSEQ =
DSL.field("mailbox_highest_modseq", BIGINT);
Field<Hstore> MAILBOX_ACL = DSL.field("mailbox_acl",
org.jooq.impl.DefaultDataType.getDefaultDataType("hstore").asConvertedDataType(new
HstoreBinding()));
+ Field<Long> MAILBOX_ACL_VERSION = DSL.field("mailbox_acl_version",
BIGINT.notNull().defaultValue(DSL.field("0", BIGINT)));
Name MAILBOX_NAME_USER_NAME_NAMESPACE_UNIQUE_CONSTRAINT =
DSL.name("mailbox_mailbox_name_user_name_mailbox_namespace_key");
@@ -60,6 +61,7 @@ public interface PostgresMailboxModule {
.column(MAILBOX_LAST_UID)
.column(MAILBOX_HIGHEST_MODSEQ)
.column(MAILBOX_ACL)
+ .column(MAILBOX_ACL_VERSION)
.constraint(DSL.primaryKey(MAILBOX_ID))
.constraint(DSL.constraint(MAILBOX_NAME_USER_NAME_NAMESPACE_UNIQUE_CONSTRAINT).unique(MAILBOX_NAME,
USER_NAME, MAILBOX_NAMESPACE))))
.supportsRowLevelSecurity()
diff --git
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
index 616e4bd1e2..a8bd57b8ba 100644
---
a/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
+++
b/mailbox/postgres/src/main/java/org/apache/james/mailbox/postgres/mail/dao/PostgresMailboxDAO.java
@@ -20,6 +20,7 @@
package org.apache.james.mailbox.postgres.mail.dao;
import static
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_ACL;
+import static
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_ACL_VERSION;
import static
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_HIGHEST_MODSEQ;
import static
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_ID;
import static
org.apache.james.mailbox.postgres.mail.PostgresMailboxModule.PostgresMailboxTable.MAILBOX_LAST_UID;
@@ -55,6 +56,7 @@ import org.apache.james.mailbox.model.UidValidity;
import org.apache.james.mailbox.model.search.MailboxQuery;
import org.apache.james.mailbox.model.search.Wildcard;
import org.apache.james.mailbox.postgres.PostgresMailboxId;
+import org.apache.james.mailbox.postgres.mail.PostgresACLUpsertException;
import org.apache.james.mailbox.postgres.mail.PostgresMailbox;
import org.apache.james.mailbox.store.MailboxExpressionBackwardCompatibility;
import org.jooq.Condition;
@@ -150,15 +152,34 @@ public class PostgresMailboxDAO {
.switchIfEmpty(Mono.error(new
MailboxNotFoundException(mailbox.getMailboxId())));
}
+ public Mono<Void> upsertACL(MailboxId mailboxId, MailboxACL acl, Long
currentAclVersion) {
+ return postgresExecutor.executeReturnAffectedRowsCount(dslContext ->
Mono.from(dslContext.update(TABLE_NAME)
+ .set(MAILBOX_ACL, MAILBOX_ACL_TO_HSTORE_FUNCTION.apply(acl))
+ .set(MAILBOX_ACL_VERSION, currentAclVersion + 1)
+ .where(MAILBOX_ID.eq(((PostgresMailboxId) mailboxId).asUuid()))
+ .and(MAILBOX_ACL_VERSION.eq(currentAclVersion))))
+ .filter(count -> count > 0)
+ .switchIfEmpty(Mono.error(new PostgresACLUpsertException("Upsert
mailbox acl failed with mailboxId " + mailboxId.serialize())))
+ .then();
+ }
+
public Mono<Void> upsertACL(MailboxId mailboxId, MailboxACL acl) {
return postgresExecutor.executeReturnAffectedRowsCount(dslContext ->
Mono.from(dslContext.update(TABLE_NAME)
- .set(MAILBOX_ACL, MAILBOX_ACL_TO_HSTORE_FUNCTION.apply(acl))
- .where(MAILBOX_ID.eq(((PostgresMailboxId) mailboxId).asUuid()))))
+ .set(MAILBOX_ACL, MAILBOX_ACL_TO_HSTORE_FUNCTION.apply(acl))
+ .set(DSL.field(MAILBOX_ACL_VERSION.getName()), (Object)
DSL.field(MAILBOX_ACL_VERSION.getName() + " + 1"))
+ .where(MAILBOX_ID.eq(((PostgresMailboxId)
mailboxId).asUuid()))))
.filter(count -> count > 0)
.switchIfEmpty(Mono.error(new RuntimeException("Upsert mailbox acl
failed with mailboxId " + mailboxId.serialize())))
.then();
}
+ public Mono<Pair<MailboxACL, Long>> getACL(MailboxId mailboxId) {
+ return postgresExecutor.executeRow(dsl ->
Mono.from(dsl.select(MAILBOX_ACL, MAILBOX_ACL_VERSION)
+ .from(TABLE_NAME)
+ .where(MAILBOX_ID.eq(((PostgresMailboxId)
mailboxId).asUuid()))))
+ .map(record ->
Pair.of(Optional.ofNullable(record.get(MAILBOX_ACL)).map(HSTORE_TO_MAILBOX_ACL_FUNCTION).orElse(new
MailboxACL()), record.get(MAILBOX_ACL_VERSION)));
+ }
+
public Flux<PostgresMailbox> findMailboxesByUsername(Username userName) {
return postgresExecutor.executeRows(dslContext ->
Flux.from(dslContext.select(MAILBOX_ID,
MAILBOX_NAME,
diff --git
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
index 9f700ac804..67f44e77ed 100644
---
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
+++
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
@@ -19,18 +19,63 @@
package org.apache.james.mailbox.postgres.mail;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+
import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.mailbox.model.MailboxACL;
import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
import org.apache.james.mailbox.store.mail.MailboxMapper;
import org.apache.james.mailbox.store.mail.model.MailboxMapperACLTest;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import com.google.common.collect.ImmutableMap;
+
class PostgresMailboxMapperACLTest extends MailboxMapperACLTest {
@RegisterExtension
static PostgresExtension postgresExtension =
PostgresExtension.withoutRowLevelSecurity(PostgresMailboxModule.MODULE);
+ private PostgresMailboxMapper mailboxMapper;
+
@Override
protected MailboxMapper createMailboxMapper() {
- return new PostgresMailboxMapper(new
PostgresMailboxDAO(postgresExtension.getDefaultPostgresExecutor()));
+ mailboxMapper = new PostgresMailboxMapper(new
PostgresMailboxDAO(postgresExtension.getDefaultPostgresExecutor()));
+ return mailboxMapper;
+ }
+
+ @Test
+ protected void updateAclShouldWorkWellInMultiThreadEnv() throws
ExecutionException, InterruptedException {
+ MailboxACL.Rfc4314Rights rights = new
MailboxACL.Rfc4314Rights(MailboxACL.Right.Administer, MailboxACL.Right.Write);
+ MailboxACL.Rfc4314Rights newRights = new
MailboxACL.Rfc4314Rights(MailboxACL.Right.Write);
+
+ ConcurrentTestRunner.builder()
+ .reactorOperation((threadNumber, step) -> {
+ int userNumber = threadNumber / 2;
+ MailboxACL.EntryKey key =
MailboxACL.EntryKey.createUserEntryKey("user" + userNumber);
+ if (threadNumber % 2 == 0) {
+ return mailboxMapper.updateACL(benwaInboxMailbox,
MailboxACL.command().key(key).rights(rights).asReplacement())
+ .then();
+ } else {
+ return mailboxMapper.updateACL(benwaInboxMailbox,
MailboxACL.command().key(key).rights(newRights).asAddition())
+ .then();
+ }
+ })
+ .threadCount(10)
+ .operationCount(1)
+ .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+ MailboxACL expectedMailboxACL = new MailboxACL(IntStream.range(0,
5).boxed()
+ .collect(ImmutableMap.toImmutableMap(userNumber ->
MailboxACL.EntryKey.createUserEntryKey("user" + userNumber), userNumber ->
rights)));
+
+ assertThat(
+ mailboxMapper.findMailboxById(benwaInboxMailbox.getMailboxId())
+ .block()
+ .getACL())
+ .isEqualTo(expectedMailboxACL);
}
}
diff --git
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
index 88e87684a1..6e2f94a90f 100644
---
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
+++
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
@@ -43,7 +43,7 @@ public abstract class MailboxMapperACLTest {
private static final Username USER_1 = Username.of("user1");
private static final Username USER_2 = Username.of("user2");
- private Mailbox benwaInboxMailbox;
+ protected Mailbox benwaInboxMailbox;
private MailboxMapper mailboxMapper;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]