This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 4a2603eab8 [FIX] make `updateACL` thread-safe in InMemoryMailboxMapper
4a2603eab8 is described below
commit 4a2603eab8457a95b1fe54aaabfe7a25ee86123d
Author: TungTV <[email protected]>
AuthorDate: Wed Feb 19 09:15:58 2025 +0700
[FIX] make `updateACL` thread-safe in InMemoryMailboxMapper
---
.../inmemory/mail/InMemoryMailboxMapper.java | 74 ++++++++++++++--------
.../mail/PostgresMailboxMapperACLTest.java | 42 ------------
.../RLSSupportPostgresMailboxMapperACLTest.java | 8 +++
.../store/mail/model/MailboxMapperACLTest.java | 36 +++++++++++
4 files changed, 93 insertions(+), 67 deletions(-)
diff --git
a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java
b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java
index 6bc71804c0..3b97058ccf 100644
---
a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java
+++
b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMailboxMapper.java
@@ -27,6 +27,7 @@ import org.apache.james.core.Username;
import org.apache.james.mailbox.acl.ACLDiff;
import org.apache.james.mailbox.exception.MailboxExistsException;
import org.apache.james.mailbox.exception.MailboxNotFoundException;
+import org.apache.james.mailbox.exception.UnsupportedRightException;
import org.apache.james.mailbox.inmemory.InMemoryId;
import org.apache.james.mailbox.model.Mailbox;
import org.apache.james.mailbox.model.MailboxACL;
@@ -47,31 +48,33 @@ import reactor.core.publisher.Mono;
public class InMemoryMailboxMapper implements MailboxMapper {
private static final int INITIAL_SIZE = 128;
- private final ConcurrentHashMap<MailboxPath, Mailbox> mailboxesByPath;
+ private final ConcurrentHashMap<MailboxId, Mailbox> mailboxesById;
private final AtomicLong mailboxIdGenerator = new AtomicLong();
public InMemoryMailboxMapper() {
- mailboxesByPath = new ConcurrentHashMap<>(INITIAL_SIZE);
+ mailboxesById = new ConcurrentHashMap<>(INITIAL_SIZE);
}
@Override
public Mono<Void> delete(Mailbox mailbox) {
- return Mono.fromRunnable(() ->
mailboxesByPath.remove(mailbox.generateAssociatedPath()));
+ return Mono.fromRunnable(() ->
mailboxesById.remove(mailbox.getMailboxId()));
}
public Mono<Void> deleteAll() {
- return Mono.fromRunnable(mailboxesByPath::clear);
+ return Mono.fromRunnable(mailboxesById::clear);
}
@Override
public Mono<Mailbox> findMailboxByPath(MailboxPath path) {
- return Mono.defer(() -> Mono.justOrEmpty(mailboxesByPath.get(path)))
+ return Flux.fromIterable(mailboxesById.values())
+ .filter(mailbox -> path.equals(mailbox.generateAssociatedPath()))
+ .next()
.map(Mailbox::new);
}
@Override
public Mono<Mailbox> findMailboxById(MailboxId id) {
- return Mono.fromCallable(mailboxesByPath::values)
+ return Mono.fromCallable(mailboxesById::values)
.flatMapIterable(Function.identity())
.filter(mailbox -> mailbox.getMailboxId().equals(id))
.next()
@@ -81,7 +84,7 @@ public class InMemoryMailboxMapper implements MailboxMapper {
@Override
public Flux<Mailbox> findMailboxWithPathLike(MailboxQuery.UserBound query)
{
- return Mono.fromCallable(mailboxesByPath::values)
+ return Mono.fromCallable(mailboxesById::values)
.flatMapIterable(Function.identity())
.filter(query::matches)
.map(Mailbox::new);
@@ -92,30 +95,40 @@ public class InMemoryMailboxMapper implements MailboxMapper
{
InMemoryId id = InMemoryId.of(mailboxIdGenerator.incrementAndGet());
Mailbox mailbox = new Mailbox(mailboxPath, uidValidity, id);
- return saveMailbox(mailbox)
- .thenReturn(mailbox);
+ return existsMailboxPath(mailboxPath)
+ .flatMap(exist -> {
+ if (exist) {
+ return Mono.error(new
MailboxExistsException(mailboxPath.getName()));
+ }
+ return saveMailbox(mailbox)
+ .thenReturn(mailbox);
+ });
}
@Override
public Mono<MailboxId> rename(Mailbox mailbox) {
Preconditions.checkNotNull(mailbox.getMailboxId(), "A mailbox we want
to rename should have a defined mailboxId");
- InMemoryId id = (InMemoryId) mailbox.getMailboxId();
- return findMailboxById(id)
- .flatMap(mailboxWithPreviousName -> saveMailbox(mailbox)
- .then(Mono.fromCallable(() ->
mailboxesByPath.remove(mailboxWithPreviousName.generateAssociatedPath()))))
- .thenReturn(mailbox.getMailboxId());
+ return existsMailboxPath(mailbox.generateAssociatedPath())
+ .flatMap(exist -> {
+ if (exist) {
+ return Mono.error(new
MailboxExistsException(mailbox.generateAssociatedPath().getName()));
+ }
+ return Mono.defer(() ->
Mono.justOrEmpty(mailboxesById.computeIfPresent(mailbox.getMailboxId(),
(mailboxId, existingMailbox) -> mailbox))
+ .map(Mailbox::getMailboxId)
+ .switchIfEmpty(Mono.error(new
MailboxNotFoundException(mailbox.getMailboxId()))));
+ });
}
private Mono<Void> saveMailbox(Mailbox mailbox) {
- return Mono.defer(() ->
Mono.justOrEmpty(mailboxesByPath.putIfAbsent(mailbox.generateAssociatedPath(),
mailbox)))
+ return Mono.defer(() ->
Mono.justOrEmpty(mailboxesById.putIfAbsent(mailbox.getMailboxId(), mailbox)))
.flatMap(ignored -> Mono.error(new
MailboxExistsException(mailbox.getName())));
}
@Override
public Mono<Boolean> hasChildren(Mailbox mailbox, char delimiter) {
String mailboxName = mailbox.getName() + delimiter;
- return Mono.fromCallable(mailboxesByPath::values)
+ return Mono.fromCallable(mailboxesById::values)
.flatMapIterable(Function.identity())
.filter(box -> belongsToSameUser(mailbox, box) &&
box.getName().startsWith(mailboxName))
.hasElements();
@@ -128,32 +141,39 @@ public class InMemoryMailboxMapper implements
MailboxMapper {
@Override
public Flux<Mailbox> list() {
- return Mono.fromCallable(mailboxesByPath::values)
+ return Mono.fromCallable(mailboxesById::values)
.flatMapIterable(Function.identity());
}
@Override
public Mono<ACLDiff> updateACL(Mailbox mailbox, MailboxACL.ACLCommand
mailboxACLCommand) {
- return Mono.fromCallable(() -> {
- MailboxACL oldACL = mailbox.getACL();
- MailboxACL newACL = mailbox.getACL().apply(mailboxACLCommand);
-
mailboxesByPath.get(mailbox.generateAssociatedPath()).setACL(newACL);
- return ACLDiff.computeDiff(oldACL, newACL);
- });
+ return Mono.just(mailbox.getACL())
+ .flatMap(oldACL -> Mono.fromCallable(() ->
mailboxesById.compute(mailbox.getMailboxId(), (mailboxId, existingMailbox) -> {
+ if (existingMailbox == null) {
+ throw new IllegalArgumentException("Mailbox not found
for id: " + mailboxId);
+ }
+ try {
+
existingMailbox.setACL(existingMailbox.getACL().apply(mailboxACLCommand));
+ } catch (UnsupportedRightException e) {
+ throw new RuntimeException("ACL update failed", e);
+ }
+ return existingMailbox;
+ }))
+ .map(updatedMailbox -> ACLDiff.computeDiff(oldACL,
updatedMailbox.getACL())));
}
@Override
public Mono<ACLDiff> setACL(Mailbox mailbox, MailboxACL mailboxACL) {
return Mono.fromCallable(() -> {
MailboxACL oldMailboxAcl = mailbox.getACL();
-
mailboxesByPath.get(mailbox.generateAssociatedPath()).setACL(mailboxACL);
+ mailboxesById.get(mailbox.getMailboxId()).setACL(mailboxACL);
return ACLDiff.computeDiff(oldMailboxAcl, mailboxACL);
});
}
@Override
public Flux<Mailbox> findNonPersonalMailboxes(Username userName, Right
right) {
- return Mono.fromCallable(mailboxesByPath::values)
+ return Mono.fromCallable(mailboxesById::values)
.flatMapIterable(Function.identity())
.filter(mailbox -> hasRightOn(mailbox, userName, right));
}
@@ -166,4 +186,8 @@ public class InMemoryMailboxMapper implements MailboxMapper
{
.map(rights -> rights.contains(right))
.orElse(false);
}
+
+ private Mono<Boolean> existsMailboxPath(MailboxPath mailboxPath) {
+ return findMailboxByPath(mailboxPath).hasElement();
+ }
}
diff --git
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
index 67f44e77ed..1c59824bbf 100644
---
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
+++
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/PostgresMailboxMapperACLTest.java
@@ -19,23 +19,12 @@
package org.apache.james.mailbox.postgres.mail;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.time.Duration;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.IntStream;
-
import org.apache.james.backends.postgres.PostgresExtension;
-import org.apache.james.mailbox.model.MailboxACL;
import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
import org.apache.james.mailbox.store.mail.MailboxMapper;
import org.apache.james.mailbox.store.mail.model.MailboxMapperACLTest;
-import org.apache.james.util.concurrency.ConcurrentTestRunner;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import com.google.common.collect.ImmutableMap;
-
class PostgresMailboxMapperACLTest extends MailboxMapperACLTest {
@RegisterExtension
static PostgresExtension postgresExtension =
PostgresExtension.withoutRowLevelSecurity(PostgresMailboxModule.MODULE);
@@ -47,35 +36,4 @@ class PostgresMailboxMapperACLTest extends
MailboxMapperACLTest {
mailboxMapper = new PostgresMailboxMapper(new
PostgresMailboxDAO(postgresExtension.getDefaultPostgresExecutor()));
return mailboxMapper;
}
-
- @Test
- protected void updateAclShouldWorkWellInMultiThreadEnv() throws
ExecutionException, InterruptedException {
- MailboxACL.Rfc4314Rights rights = new
MailboxACL.Rfc4314Rights(MailboxACL.Right.Administer, MailboxACL.Right.Write);
- MailboxACL.Rfc4314Rights newRights = new
MailboxACL.Rfc4314Rights(MailboxACL.Right.Write);
-
- ConcurrentTestRunner.builder()
- .reactorOperation((threadNumber, step) -> {
- int userNumber = threadNumber / 2;
- MailboxACL.EntryKey key =
MailboxACL.EntryKey.createUserEntryKey("user" + userNumber);
- if (threadNumber % 2 == 0) {
- return mailboxMapper.updateACL(benwaInboxMailbox,
MailboxACL.command().key(key).rights(rights).asReplacement())
- .then();
- } else {
- return mailboxMapper.updateACL(benwaInboxMailbox,
MailboxACL.command().key(key).rights(newRights).asAddition())
- .then();
- }
- })
- .threadCount(10)
- .operationCount(1)
- .runSuccessfullyWithin(Duration.ofMinutes(1));
-
- MailboxACL expectedMailboxACL = new MailboxACL(IntStream.range(0,
5).boxed()
- .collect(ImmutableMap.toImmutableMap(userNumber ->
MailboxACL.EntryKey.createUserEntryKey("user" + userNumber), userNumber ->
rights)));
-
- assertThat(
- mailboxMapper.findMailboxById(benwaInboxMailbox.getMailboxId())
- .block()
- .getACL())
- .isEqualTo(expectedMailboxACL);
- }
}
diff --git
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/RLSSupportPostgresMailboxMapperACLTest.java
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/RLSSupportPostgresMailboxMapperACLTest.java
index 1352f0b74e..9811865f8d 100644
---
a/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/RLSSupportPostgresMailboxMapperACLTest.java
+++
b/mailbox/postgres/src/test/java/org/apache/james/mailbox/postgres/mail/RLSSupportPostgresMailboxMapperACLTest.java
@@ -19,11 +19,14 @@
package org.apache.james.mailbox.postgres.mail;
+import java.util.concurrent.ExecutionException;
+
import org.apache.james.backends.postgres.PostgresExtension;
import org.apache.james.backends.postgres.PostgresModule;
import org.apache.james.mailbox.postgres.mail.dao.PostgresMailboxDAO;
import org.apache.james.mailbox.store.mail.MailboxMapper;
import org.apache.james.mailbox.store.mail.model.MailboxMapperACLTest;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.extension.RegisterExtension;
class RLSSupportPostgresMailboxMapperACLTest extends MailboxMapperACLTest {
@@ -36,4 +39,9 @@ class RLSSupportPostgresMailboxMapperACLTest extends
MailboxMapperACLTest {
return new RLSSupportPostgresMailboxMapper(new
PostgresMailboxDAO(postgresExtension.getDefaultPostgresExecutor()),
new
PostgresMailboxMemberDAO(postgresExtension.getDefaultPostgresExecutor()));
}
+
+ @Override
+ @Disabled("not yet implemented")
+ protected void updateAclShouldWorkWellInMultiThreadEnv() throws
ExecutionException, InterruptedException {
+ }
}
diff --git
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
index 6e2f94a90f..d0e9ebccaf 100644
---
a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
+++
b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MailboxMapperACLTest.java
@@ -21,6 +21,10 @@ package org.apache.james.mailbox.store.mail.model;
import static org.assertj.core.api.Assertions.assertThat;
+import java.time.Duration;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.IntStream;
+
import org.apache.james.core.Username;
import org.apache.james.mailbox.acl.ACLDiff;
import org.apache.james.mailbox.exception.MailboxException;
@@ -32,6 +36,7 @@ import org.apache.james.mailbox.model.MailboxACL.Right;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.model.UidValidity;
import org.apache.james.mailbox.store.mail.MailboxMapper;
+import org.apache.james.util.concurrency.ConcurrentTestRunner;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -443,4 +448,35 @@ public abstract class MailboxMapperACLTest {
assertThat(mailboxMapper.updateACL(benwaInboxMailbox,
aclCommand).block()).isEqualTo(expectAclDiff);
}
+
+ @Test
+ protected void updateAclShouldWorkWellInMultiThreadEnv() throws
ExecutionException, InterruptedException {
+ MailboxACL.Rfc4314Rights rights = new
MailboxACL.Rfc4314Rights(MailboxACL.Right.Administer, MailboxACL.Right.Write);
+ MailboxACL.Rfc4314Rights newRights = new
MailboxACL.Rfc4314Rights(MailboxACL.Right.Write);
+
+ ConcurrentTestRunner.builder()
+ .reactorOperation((threadNumber, step) -> {
+ int userNumber = threadNumber / 2;
+ MailboxACL.EntryKey key =
MailboxACL.EntryKey.createUserEntryKey("user" + userNumber);
+ if (threadNumber % 2 == 0) {
+ return mailboxMapper.updateACL(benwaInboxMailbox,
MailboxACL.command().key(key).rights(rights).asReplacement())
+ .then();
+ } else {
+ return mailboxMapper.updateACL(benwaInboxMailbox,
MailboxACL.command().key(key).rights(newRights).asAddition())
+ .then();
+ }
+ })
+ .threadCount(10)
+ .operationCount(1)
+ .runSuccessfullyWithin(Duration.ofMinutes(1));
+
+ MailboxACL expectedMailboxACL = new MailboxACL(IntStream.range(0,
5).boxed()
+ .collect(ImmutableMap.toImmutableMap(userNumber ->
MailboxACL.EntryKey.createUserEntryKey("user" + userNumber), userNumber ->
rights)));
+
+ assertThat(
+ mailboxMapper.findMailboxById(benwaInboxMailbox.getMailboxId())
+ .block()
+ .getACL())
+ .isEqualTo(expectedMailboxACL);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]