JAMES-2659 fix both registrationsShouldNotHandleEventsAfterStop tests
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/713a7c54 Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/713a7c54 Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/713a7c54 Branch: refs/heads/master Commit: 713a7c5450fe279ff7d60c3b29efd381f060ebc0 Parents: 52ec5db Author: Matthieu Baechler <matth...@apache.org> Authored: Thu Feb 7 16:02:00 2019 +0100 Committer: Matthieu Baechler <matth...@apache.org> Committed: Fri Feb 8 10:19:55 2019 +0100 ---------------------------------------------------------------------- .../mailbox/events/RabbitMQEventBusTest.java | 47 +++++++++++--------- .../util/concurrency/ConcurrentTestRunner.java | 16 ++++++- .../concurrency/ConcurrentTestRunnerTest.java | 36 +++++++++++++++ 3 files changed, 78 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/james-project/blob/713a7c54/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java ---------------------------------------------------------------------- diff --git a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java index 0a19f9f..1dacb5f 100644 --- a/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java +++ b/mailbox/event/event-rabbitmq/src/test/java/org/apache/james/mailbox/events/RabbitMQEventBusTest.java @@ -49,6 +49,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.Closeable; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.TimeUnit; @@ -65,12 +66,12 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.mockito.stubbing.Answer; import com.rabbitmq.client.Connection; - import reactor.core.publisher.Mono; import reactor.rabbitmq.BindingSpecification; import reactor.rabbitmq.ExchangeSpecification; @@ -281,7 +282,7 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, private final Duration TEN_SECONDS = Duration.ofSeconds(10); private static final int THREAD_COUNT = 10; - private static final int OPERATION_COUNT = 100; + private static final int OPERATION_COUNT = 100000; private static final int MAX_EVENT_DISPATCHED_COUNT = THREAD_COUNT * OPERATION_COUNT; private RabbitMQManagementAPI rabbitManagementAPI; @@ -464,19 +465,23 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, MailboxListenerCountingSuccessfulExecution listener = new MailboxListenerCountingSuccessfulExecution(); eventBus.register(listener, GROUP_A); - ConcurrentTestRunner.builder() + try (Closeable closeable = ConcurrentTestRunner.builder() .operation((threadNumber, step) -> eventBus.dispatch(EVENT, KEY_1)) .threadCount(THREAD_COUNT) .operationCount(OPERATION_COUNT) - .runSuccessfullyWithin(Duration.ofMinutes(1)); + .run()) { - eventBus.stop(); - int callsAfterStop = listener.numberOfEventCalls(); + TimeUnit.SECONDS.sleep(2); + + eventBus.stop(); + eventBus2.stop(); + int callsAfterStop = listener.numberOfEventCalls(); - TimeUnit.SECONDS.sleep(1); - assertThat(listener.numberOfEventCalls()) - .isEqualTo(callsAfterStop) - .isLessThan(MAX_EVENT_DISPATCHED_COUNT); + TimeUnit.SECONDS.sleep(1); + assertThat(listener.numberOfEventCalls()) + .isEqualTo(callsAfterStop) + .isLessThanOrEqualTo(MAX_EVENT_DISPATCHED_COUNT); + } } } @@ -544,7 +549,6 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, .isEmpty(); } - @Disabled("JAMES-2659 instable test") @Test void registrationsShouldNotHandleEventsAfterStop() throws Exception { eventBus.start(); @@ -554,20 +558,23 @@ class RabbitMQEventBusTest implements GroupContract.SingleEventBusGroupContract, eventBus.register(listener, GROUP_A); eventBus2.register(listener, GROUP_A); - ConcurrentTestRunner.builder() + try (Closeable closeable = ConcurrentTestRunner.builder() .operation((threadNumber, step) -> eventBus.dispatch(EVENT, KEY_1)) .threadCount(THREAD_COUNT) .operationCount(OPERATION_COUNT) - .runSuccessfullyWithin(TEN_SECONDS); + .run()) { - eventBus.stop(); - eventBus2.stop(); - int callsAfterStop = listener.numberOfEventCalls(); + TimeUnit.SECONDS.sleep(2); + + eventBus.stop(); + eventBus2.stop(); + int callsAfterStop = listener.numberOfEventCalls(); - TimeUnit.SECONDS.sleep(1); - assertThat(listener.numberOfEventCalls()) - .isEqualTo(callsAfterStop) - .isLessThan(MAX_EVENT_DISPATCHED_COUNT); + TimeUnit.SECONDS.sleep(1); + assertThat(listener.numberOfEventCalls()) + .isEqualTo(callsAfterStop) + .isLessThanOrEqualTo(MAX_EVENT_DISPATCHED_COUNT); + } } } http://git-wip-us.apache.org/repos/asf/james-project/blob/713a7c54/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java ---------------------------------------------------------------------- diff --git a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java index 080198d..7ff1fbc 100644 --- a/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java +++ b/server/container/util/src/main/java/org/apache/james/util/concurrency/ConcurrentTestRunner.java @@ -19,6 +19,8 @@ package org.apache.james.util.concurrency; +import java.io.Closeable; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -37,7 +39,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; -public class ConcurrentTestRunner { +public class ConcurrentTestRunner implements Closeable { public static final int DEFAULT_OPERATION_COUNT = 1; @@ -78,6 +80,12 @@ public class ConcurrentTestRunner { operation); } + public ConcurrentTestRunner run() { + ConcurrentTestRunner testRunner = build(); + testRunner.run(); + return testRunner; + } + public ConcurrentTestRunner runSuccessfullyWithin(Duration duration) throws InterruptedException, ExecutionException { return build() .runSuccessfullyWithin(duration); @@ -181,4 +189,10 @@ public class ConcurrentTestRunner { return run() .awaitTermination(duration); } + + + @Override + public void close() throws IOException { + executorService.shutdownNow(); + } } http://git-wip-us.apache.org/repos/asf/james-project/blob/713a7c54/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java ---------------------------------------------------------------------- diff --git a/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java b/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java index 2a2f81e..1381694 100644 --- a/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java +++ b/server/container/util/src/test/java/org/apache/james/util/concurrency/ConcurrentTestRunnerTest.java @@ -23,9 +23,12 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.io.Closeable; +import java.io.IOException; import java.time.Duration; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -106,6 +109,39 @@ public class ConcurrentTestRunnerTest { ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); assertThatCode(() -> ConcurrentTestRunner.builder() + .operation((threadNumber, step) -> queue.add(threadNumber + ":" + step)) + .threadCount(2) + .operationCount(2) + .run() + .awaitTermination(Duration.ofSeconds(1))) + .doesNotThrowAnyException(); + + assertThat(queue).containsOnly("0:0", "0:1", "1:0", "1:1"); + } + + @Test + void closeShouldPreventPerformAllOperations() throws IOException, InterruptedException { + ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); + + int maxItems = 200000; + Closeable closeable = ConcurrentTestRunner.builder() + .operation((threadNumber, step) -> queue.add(threadNumber + ":" + step)) + .threadCount(2) + .operationCount(maxItems) + .run(); + closeable.close(); + TimeUnit.SECONDS.sleep(1); + int stabilizedItemCount = queue.size(); + assertThat(stabilizedItemCount).isLessThanOrEqualTo(maxItems * 2); + TimeUnit.SECONDS.sleep(1); + assertThat(queue).hasSize(stabilizedItemCount); + } + + @Test + void runSuccessfullyWithinShouldPerformAllOperations() { + ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); + + assertThatCode(() -> ConcurrentTestRunner.builder() .operation((threadNumber, step) -> queue.add(threadNumber + ":" + step)) .threadCount(2) .operationCount(2) --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org