This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 7de356a4f2b4df51a0ff5114fe2c8c0943139329
Author: Benoit Tellier <[email protected]>
AuthorDate: Thu Feb 20 13:54:26 2020 +0700

    JAMES-3058 Cassandra Testing Session: barrier synchronisation
    
    This allow easy concurrency testing for Cassandra tests
---
 .../james/backends/cassandra/TestingSession.java   | 47 +++++++++++---
 .../backends/cassandra/TestingSessionTest.java     | 71 ++++++++++++++++++++++
 2 files changed, 108 insertions(+), 10 deletions(-)

diff --git 
a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java
 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java
index 5e0ad9d..2e879c8 100644
--- 
a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java
+++ 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java
@@ -20,8 +20,8 @@
 package org.apache.james.backends.cassandra;
 
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.BiFunction;
 import java.util.function.Predicate;
 
 import com.datastax.driver.core.BoundStatement;
@@ -36,22 +36,45 @@ import com.datastax.driver.core.Statement;
 import com.google.common.util.concurrent.ListenableFuture;
 
 public class TestingSession implements Session {
-    enum Behavior {
-        THROW((session, statement) -> {
+    @FunctionalInterface
+    interface Behavior {
+        Behavior THROW = (session, statement) -> {
             RuntimeException injected_failure = new RuntimeException("Injected 
failure");
             injected_failure.printStackTrace();
             throw injected_failure;
-        }),
-        EXECUTE_NORMALLY(Session::executeAsync);
+        };
 
-        private final BiFunction<Session, Statement, ResultSetFuture> 
behaviour;
+        Behavior EXECUTE_NORMALLY = Session::executeAsync;
 
-        Behavior(BiFunction<Session, Statement, ResultSetFuture> behaviour) {
-            this.behaviour = behaviour;
+        static Behavior awaitOn(Barrier barrier) {
+            return (session, statement) -> {
+                barrier.call();
+                return session.executeAsync(statement);
+            };
         }
 
-        ResultSetFuture execute(Session session, Statement statement) {
-            return behaviour.apply(session, statement);
+        ResultSetFuture execute(Session session, Statement statement);
+    }
+
+    public static class Barrier {
+        private final CountDownLatch callerLatch = new CountDownLatch(1);
+        private final CountDownLatch awaitCallerLatch = new CountDownLatch(1);
+
+        void awaitCaller() throws InterruptedException {
+            awaitCallerLatch.await();
+        }
+
+        void releaseCaller() {
+            callerLatch.countDown();
+        }
+
+        void call() {
+            awaitCallerLatch.countDown();
+            try {
+                callerLatch.await();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
         }
     }
 
@@ -139,6 +162,10 @@ public class TestingSession implements Session {
         return condition -> applyCount -> () -> executionHook = new 
ExecutionHook(condition, Behavior.THROW, applyCount);
     }
 
+    public RequiresCondition awaitOn(Barrier barrier) {
+        return condition -> applyCount -> () -> executionHook = new 
ExecutionHook(condition, Behavior.awaitOn(barrier), applyCount);
+    }
+
     public void resetExecutionHook() {
         executionHook = NO_EXECUTION_HOOK;
     }
diff --git 
a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
index 1983e99..c044697 100644
--- 
a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
+++ 
b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSessionTest.java
@@ -19,9 +19,11 @@
 
 package org.apache.james.backends.cassandra;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import org.apache.james.backends.cassandra.TestingSession.Barrier;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionDAO;
 import 
org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.backends.cassandra.versions.SchemaVersion;
@@ -30,6 +32,9 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
 class TestingSessionTest {
     @RegisterExtension
     static CassandraClusterExtension cassandraCluster = new 
CassandraClusterExtension(CassandraSchemaVersionModule.MODULE);
@@ -158,4 +163,70 @@ class TestingSessionTest {
         assertThatThrownBy(() -> dao.getCurrentSchemaVersion().block())
             .isInstanceOf(RuntimeException.class);
     }
+
+    @Test
+    void statementShouldNotBeAppliedBeforeBarrierIsReleased(CassandraCluster 
cassandra) throws Exception {
+        SchemaVersion originalSchemaVersion = new SchemaVersion(32);
+        dao.updateVersion(originalSchemaVersion).block();
+        Barrier barrier = new Barrier();
+        cassandra.getConf()
+            .awaitOn(barrier)
+            .whenBoundStatementStartsWith("INSERT INTO schemaVersion")
+            .times(1)
+            .setExecutionHook();
+
+        dao.updateVersion(new 
SchemaVersion(36)).subscribeOn(Schedulers.elastic()).subscribe();
+
+        Thread.sleep(100);
+
+        assertThat(dao.getCurrentSchemaVersion().block())
+            .contains(originalSchemaVersion);
+    }
+
+    @Test
+    void statementShouldBeAppliedWhenBarrierIsReleased(CassandraCluster 
cassandra) throws Exception {
+        SchemaVersion originalSchemaVersion = new SchemaVersion(32);
+        SchemaVersion newVersion = new SchemaVersion(36);
+
+        dao.updateVersion(originalSchemaVersion).block();
+        Barrier barrier = new Barrier();
+        cassandra.getConf()
+            .awaitOn(barrier)
+            .whenBoundStatementStartsWith("INSERT INTO schemaVersion")
+            .times(1)
+            .setExecutionHook();
+
+        Mono<Void> operation = dao.updateVersion(newVersion).cache();
+
+        operation.subscribeOn(Schedulers.elastic()).subscribe();
+        barrier.releaseCaller();
+        operation.block();
+
+        assertThat(dao.getCurrentSchemaVersion().block())
+            .contains(newVersion);
+    }
+
+    @Test
+    void testShouldBeAbleToAwaitCaller(CassandraCluster cassandra) throws 
Exception {
+        SchemaVersion originalSchemaVersion = new SchemaVersion(32);
+        SchemaVersion newVersion = new SchemaVersion(36);
+
+        dao.updateVersion(originalSchemaVersion).block();
+        Barrier barrier = new Barrier();
+        cassandra.getConf()
+            .awaitOn(barrier)
+            .whenBoundStatementStartsWith("INSERT INTO schemaVersion")
+            .times(1)
+            .setExecutionHook();
+
+        Mono<Void> operation = dao.updateVersion(newVersion).cache();
+
+        operation.subscribeOn(Schedulers.elastic()).subscribe();
+        barrier.awaitCaller();
+        barrier.releaseCaller();
+        operation.block();
+
+        assertThat(dao.getCurrentSchemaVersion().block())
+            .contains(newVersion);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to