This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 6e16c097c1aa77374b056342036c8f2357f3e509 Author: Benoit Tellier <[email protected]> AuthorDate: Thu Feb 20 16:52:00 2020 +0700 JAMES-3058 Concurrency test for CassandraAclMapper This can be done without code injection in business logic thanks to Cassandra statement instrumentation. --- .../james/backends/cassandra/TestingSession.java | 14 ++++++-- .../mailbox/cassandra/mail/CassandraACLMapper.java | 16 ++-------- .../cassandra/mail/CassandraACLMapperTest.java | 37 +++++++++++++++++----- 3 files changed, 42 insertions(+), 25 deletions(-) diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java index 2e879c8..d102148 100644 --- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java +++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java @@ -58,13 +58,21 @@ public class TestingSession implements Session { public static class Barrier { private final CountDownLatch callerLatch = new CountDownLatch(1); - private final CountDownLatch awaitCallerLatch = new CountDownLatch(1); + private final CountDownLatch awaitCallerLatch; - void awaitCaller() throws InterruptedException { + public Barrier() { + this(1); + } + + public Barrier(int callerCount) { + awaitCallerLatch = new CountDownLatch(callerCount); + } + + public void awaitCaller() throws InterruptedException { awaitCallerLatch.await(); } - void releaseCaller() { + public void releaseCaller() { callerLatch.countDown(); } diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java index c877d9a..4a2734b 100644 --- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java +++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java @@ -58,14 +58,8 @@ public class CassandraACLMapper { private static final Logger LOG = LoggerFactory.getLogger(CassandraACLMapper.class); private static final String OLD_VERSION = "oldVersion"; - @FunctionalInterface - public interface CodeInjector { - void inject(); - } - private final CassandraAsyncExecutor executor; private final int maxAclRetry; - private final CodeInjector codeInjector; private final CassandraUserMailboxRightsDAO userMailboxRightsDAO; private final PreparedStatement conditionalInsertStatement; private final PreparedStatement conditionalUpdateStatement; @@ -73,13 +67,8 @@ public class CassandraACLMapper { @Inject public CassandraACLMapper(Session session, CassandraUserMailboxRightsDAO userMailboxRightsDAO, CassandraConfiguration cassandraConfiguration) { - this(session, userMailboxRightsDAO, cassandraConfiguration, () -> { }); - } - - public CassandraACLMapper(Session session, CassandraUserMailboxRightsDAO userMailboxRightsDAO, CassandraConfiguration cassandraConfiguration, CodeInjector codeInjector) { this.executor = new CassandraAsyncExecutor(session); this.maxAclRetry = cassandraConfiguration.getAclMaxRetry(); - this.codeInjector = codeInjector; this.conditionalInsertStatement = prepareConditionalInsert(session); this.conditionalUpdateStatement = prepareConditionalUpdate(session); this.readStatement = prepareReadStatement(session); @@ -139,9 +128,8 @@ public class CassandraACLMapper { .orElseThrow(() -> new MailboxException("Unable to update ACL")); } - private Mono<ACLDiff> updateAcl(CassandraId cassandraId, Function<ACLWithVersion, ACLWithVersion> aclTransformation, MailboxACL replacement) throws MailboxException { - return Mono.fromRunnable(() -> codeInjector.inject()) - .then(Mono.defer(() -> getAclWithVersion(cassandraId))) + private Mono<ACLDiff> updateAcl(CassandraId cassandraId, Function<ACLWithVersion, ACLWithVersion> aclTransformation, MailboxACL replacement) { + return getAclWithVersion(cassandraId) .flatMap(aclWithVersion -> updateStoredACL(cassandraId, aclTransformation.apply(aclWithVersion)) .map(newACL -> ACLDiff.computeDiff(aclWithVersion.mailboxACL, newACL))) diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java index d112645..025a41a 100644 --- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java +++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeoutException; import org.apache.james.backends.cassandra.CassandraCluster; import org.apache.james.backends.cassandra.CassandraClusterExtension; +import org.apache.james.backends.cassandra.TestingSession.Barrier; import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration; import org.apache.james.backends.cassandra.utils.CassandraUtils; import org.apache.james.mailbox.cassandra.ids.CassandraId; @@ -160,12 +161,22 @@ class CassandraACLMapperTest { @Test void twoConcurrentUpdatesWhenNoACLStoredShouldReturnACLWithTwoEntries(CassandraCluster cassandra) throws Exception { - CountDownLatch countDownLatch = new CountDownLatch(2); + Barrier barrier = new Barrier(2); + cassandra.getConf() + .awaitOn(barrier) + .whenBoundStatementStartsWith("SELECT acl,version FROM acl WHERE id=:id;") + .times(2) + .setExecutionHook(); + MailboxACL.EntryKey keyBob = new MailboxACL.EntryKey("bob", MailboxACL.NameType.user, false); MailboxACL.Rfc4314Rights rights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Read); MailboxACL.EntryKey keyAlice = new MailboxACL.EntryKey("alice", MailboxACL.NameType.user, false); - Future<Boolean> future1 = performACLUpdateInExecutor(cassandra, executor, keyBob, rights, countDownLatch::countDown); - Future<Boolean> future2 = performACLUpdateInExecutor(cassandra, executor, keyAlice, rights, countDownLatch::countDown); + Future<Boolean> future1 = performACLUpdateInExecutor(cassandra, executor, keyBob, rights); + Future<Boolean> future2 = performACLUpdateInExecutor(cassandra, executor, keyAlice, rights); + + barrier.awaitCaller(); + barrier.releaseCaller(); + awaitAll(future1, future2); assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()) @@ -179,10 +190,21 @@ class CassandraACLMapperTest { MailboxACL.Rfc4314Rights rights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Read); cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(keyBenwa).rights(rights).asAddition()); + Barrier barrier = new Barrier(2); + cassandra.getConf() + .awaitOn(barrier) + .whenBoundStatementStartsWith("SELECT acl,version FROM acl WHERE id=:id;") + .times(2) + .setExecutionHook(); + MailboxACL.EntryKey keyBob = new MailboxACL.EntryKey("bob", MailboxACL.NameType.user, false); MailboxACL.EntryKey keyAlice = new MailboxACL.EntryKey("alice", MailboxACL.NameType.user, false); - Future<Boolean> future1 = performACLUpdateInExecutor(cassandra, executor, keyBob, rights, countDownLatch::countDown); - Future<Boolean> future2 = performACLUpdateInExecutor(cassandra, executor, keyAlice, rights, countDownLatch::countDown); + Future<Boolean> future1 = performACLUpdateInExecutor(cassandra, executor, keyBob, rights); + Future<Boolean> future2 = performACLUpdateInExecutor(cassandra, executor, keyAlice, rights); + + barrier.awaitCaller(); + barrier.releaseCaller(); + awaitAll(future1, future2); assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block()) @@ -196,13 +218,12 @@ class CassandraACLMapperTest { } } - private Future<Boolean> performACLUpdateInExecutor(CassandraCluster cassandra, ExecutorService executor, MailboxACL.EntryKey key, MailboxACL.Rfc4314Rights rights, CassandraACLMapper.CodeInjector runnable) { + private Future<Boolean> performACLUpdateInExecutor(CassandraCluster cassandra, ExecutorService executor, MailboxACL.EntryKey key, MailboxACL.Rfc4314Rights rights) { return executor.submit(() -> { CassandraACLMapper aclMapper = new CassandraACLMapper( cassandra.getConf(), new CassandraUserMailboxRightsDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION), - CassandraConfiguration.DEFAULT_CONFIGURATION, - runnable); + CassandraConfiguration.DEFAULT_CONFIGURATION); try { aclMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asAddition()); } catch (MailboxException exception) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
