JAMES-2659 fix both registrationsShouldNotHandleEventsAfterStop tests

Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/713a7c54
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/713a7c54
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/713a7c54

Branch: refs/heads/master
Commit: 713a7c5450fe279ff7d60c3b29efd381f060ebc0
Parents: 52ec5db
Author: Matthieu Baechler <matth...@apache.org>
Authored: Thu Feb 7 16:02:00 2019 +0100
Committer: Matthieu Baechler <matth...@apache.org>
Committed: Fri Feb 8 10:19:55 2019 +0100

----------------------------------------------------------------------
 .../mailbox/events/RabbitMQEventBusTest.java    | 47 +++++++++++---------
 .../util/concurrency/ConcurrentTestRunner.java  | 16 ++++++-
 .../concurrency/ConcurrentTestRunnerTest.java   | 36 +++++++++++++++
 3 files changed, 78 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/713a7c54/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
----------------------------------------------------------------------
diff --git 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
index 0a19f9f..1dacb5f 100644
--- 
a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
+++ 
b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java
@@ -49,6 +49,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.Closeable;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
@@ -65,12 +66,12 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.mockito.stubbing.Answer;
 
 import com.rabbitmq.client.Connection;
-
 import reactor.core.publisher.Mono;
 import reactor.rabbitmq.BindingSpecification;
 import reactor.rabbitmq.ExchangeSpecification;
@@ -281,7 +282,7 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
         private final Duration TEN_SECONDS = Duration.ofSeconds(10);
 
         private static final int THREAD_COUNT = 10;
-        private static final int OPERATION_COUNT = 100;
+        private static final int OPERATION_COUNT = 100000;
         private static final int MAX_EVENT_DISPATCHED_COUNT = THREAD_COUNT * 
OPERATION_COUNT;
 
         private RabbitMQManagementAPI rabbitManagementAPI;
@@ -464,19 +465,23 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
                 MailboxListenerCountingSuccessfulExecution listener = new 
MailboxListenerCountingSuccessfulExecution();
                 eventBus.register(listener, GROUP_A);
 
-                ConcurrentTestRunner.builder()
+                try (Closeable closeable = ConcurrentTestRunner.builder()
                     .operation((threadNumber, step) -> 
eventBus.dispatch(EVENT, KEY_1))
                     .threadCount(THREAD_COUNT)
                     .operationCount(OPERATION_COUNT)
-                    .runSuccessfullyWithin(Duration.ofMinutes(1));
+                    .run()) {
 
-                eventBus.stop();
-                int callsAfterStop = listener.numberOfEventCalls();
+                    TimeUnit.SECONDS.sleep(2);
+
+                    eventBus.stop();
+                    eventBus2.stop();
+                    int callsAfterStop = listener.numberOfEventCalls();
 
-                TimeUnit.SECONDS.sleep(1);
-                assertThat(listener.numberOfEventCalls())
-                    .isEqualTo(callsAfterStop)
-                    .isLessThan(MAX_EVENT_DISPATCHED_COUNT);
+                    TimeUnit.SECONDS.sleep(1);
+                    assertThat(listener.numberOfEventCalls())
+                        .isEqualTo(callsAfterStop)
+                        .isLessThanOrEqualTo(MAX_EVENT_DISPATCHED_COUNT);
+                }
             }
         }
 
@@ -544,7 +549,6 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
                     .isEmpty();
             }
 
-            @Disabled("JAMES-2659 instable test")
             @Test
             void registrationsShouldNotHandleEventsAfterStop() throws 
Exception {
                 eventBus.start();
@@ -554,20 +558,23 @@ class RabbitMQEventBusTest implements 
GroupContract.SingleEventBusGroupContract,
                 eventBus.register(listener, GROUP_A);
                 eventBus2.register(listener, GROUP_A);
 
-                ConcurrentTestRunner.builder()
+                try (Closeable closeable = ConcurrentTestRunner.builder()
                     .operation((threadNumber, step) -> 
eventBus.dispatch(EVENT, KEY_1))
                     .threadCount(THREAD_COUNT)
                     .operationCount(OPERATION_COUNT)
-                    .runSuccessfullyWithin(TEN_SECONDS);
+                    .run()) {
 
-                eventBus.stop();
-                eventBus2.stop();
-                int callsAfterStop = listener.numberOfEventCalls();
+                    TimeUnit.SECONDS.sleep(2);
+
+                    eventBus.stop();
+                    eventBus2.stop();
+                    int callsAfterStop = listener.numberOfEventCalls();
 
-                TimeUnit.SECONDS.sleep(1);
-                assertThat(listener.numberOfEventCalls())
-                    .isEqualTo(callsAfterStop)
-                    .isLessThan(MAX_EVENT_DISPATCHED_COUNT);
+                    TimeUnit.SECONDS.sleep(1);
+                    assertThat(listener.numberOfEventCalls())
+                        .isEqualTo(callsAfterStop)
+                        .isLessThanOrEqualTo(MAX_EVENT_DISPATCHED_COUNT);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/james-project/blob/713a7c54/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
----------------------------------------------------------------------
diff --git 
a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
 
b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
index 080198d..7ff1fbc 100644
--- 
a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
+++ 
b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.util.concurrency;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
@@ -37,7 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
-public class ConcurrentTestRunner {
+public class ConcurrentTestRunner implements Closeable {
 
     public static final int DEFAULT_OPERATION_COUNT = 1;
 
@@ -78,6 +80,12 @@ public class ConcurrentTestRunner {
                 operation);
         }
 
+        public ConcurrentTestRunner run() {
+            ConcurrentTestRunner testRunner = build();
+            testRunner.run();
+            return testRunner;
+        }
+
         public ConcurrentTestRunner runSuccessfullyWithin(Duration duration) 
throws InterruptedException, ExecutionException {
             return build()
                 .runSuccessfullyWithin(duration);
@@ -181,4 +189,10 @@ public class ConcurrentTestRunner {
         return run()
             .awaitTermination(duration);
     }
+
+
+    @Override
+    public void close() throws IOException {
+        executorService.shutdownNow();
+    }
 }

http://git-wip-us.apache.org/repos/asf/james-project/blob/713a7c54/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java
 
b/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java
index 2a2f81e..1381694 100644
--- 
a/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java
+++ 
b/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java
@@ -23,9 +23,12 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.time.Duration;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.jupiter.api.Test;
 
@@ -106,6 +109,39 @@ public class ConcurrentTestRunnerTest {
         ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
 
         assertThatCode(() -> ConcurrentTestRunner.builder()
+            .operation((threadNumber, step) -> queue.add(threadNumber + ":" + 
step))
+            .threadCount(2)
+            .operationCount(2)
+            .run()
+            .awaitTermination(Duration.ofSeconds(1)))
+            .doesNotThrowAnyException();
+
+        assertThat(queue).containsOnly("0:0", "0:1", "1:0", "1:1");
+    }
+
+    @Test
+    void closeShouldPreventPerformAllOperations() throws IOException, 
InterruptedException {
+        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
+
+        int maxItems = 200000;
+        Closeable closeable = ConcurrentTestRunner.builder()
+            .operation((threadNumber, step) -> queue.add(threadNumber + ":" + 
step))
+            .threadCount(2)
+            .operationCount(maxItems)
+            .run();
+        closeable.close();
+        TimeUnit.SECONDS.sleep(1);
+        int stabilizedItemCount = queue.size();
+        assertThat(stabilizedItemCount).isLessThanOrEqualTo(maxItems * 2);
+        TimeUnit.SECONDS.sleep(1);
+        assertThat(queue).hasSize(stabilizedItemCount);
+    }
+
+    @Test
+    void runSuccessfullyWithinShouldPerformAllOperations() {
+        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
+
+        assertThatCode(() -> ConcurrentTestRunner.builder()
                 .operation((threadNumber, step) -> queue.add(threadNumber + 
":" + step))
                 .threadCount(2)
                 .operationCount(2)


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org
For additional commands, e-mail: server-dev-h...@james.apache.org

Reply via email to