This is an automated email from the ASF dual-hosted git repository. nfilotto pushed a commit to branch CAMEL-17948/race-condition-in-mockendpoint in repository https://gitbox.apache.org/repos/asf/camel.git
commit bd5a729c3b4c2a3d54166cd0c0c7a474d902e351 Author: Nicolas Filotto <[email protected]> AuthorDate: Tue Apr 12 14:41:46 2022 +0200 CAMEL-17948: Force waiting before checking the asserts of the mock --- .../apache/camel/component/mock/MockEndpoint.java | 76 +++++++++++++++++----- 1 file changed, 61 insertions(+), 15 deletions(-) diff --git a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java index 7eb7873ce0c..f7e0ede24f7 100644 --- a/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java +++ b/components/camel-mock/src/main/java/org/apache/camel/component/mock/MockEndpoint.java @@ -30,6 +30,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; @@ -117,7 +118,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint, private volatile Map<String, Object> actualHeaderValues; private volatile Map<String, Object> expectedPropertyValues; - private volatile int counter; + private final AtomicInteger counter = new AtomicInteger(); @UriPath(description = "Name of mock endpoint") @Metadata(required = true) @@ -311,7 +312,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint, public void reset() { expectedCount = -1; - counter = 0; + counter.set(0); defaultProcessor = null; processors = new HashMap<>(); receivedExchanges = new CopyOnWriteArrayList<>(); @@ -441,13 +442,15 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint, } assertEquals("Received message count", expectedCount, getReceivedCounter()); } else if (expectedCount > 0) { - if (expectedCount != getReceivedCounter()) { - waitForCompleteLatch(); - } + // Always wait whatever the value of the received counter to ensure that all expected messages are + // fully processed (until the latch countDown) + waitForCompleteLatch(); if (failFastAssertionError == null) { assertEquals("Received message count", expectedCount, getReceivedCounter()); } - } else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) { + } else if (expectedMinimumCount > 0) { + // Always wait whatever the value of the received counter to ensure that all expected messages are + // fully processed (until the latch countDown) waitForCompleteLatch(); } @@ -583,10 +586,10 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint, if (factory != null) { expectedHeaderValues = factory.newMap(); } else { - // should not really happen but some tests dont start camel context + // should not really happen but some tests don't start camel context expectedHeaderValues = new HashMap<>(); } - // we just wants to expects to be called once + // we just wants to expect to be called once expects(new AssertionTask() { @Override public void assertOnIndex(int i) { @@ -619,6 +622,29 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint, expectedHeaderValues.put(name, value); } + /** + * Sets an expectation that no header is expected in the messages received by this endpoint + */ + public void expectedNoHeaderReceived() { + if (expectedMinimumCount == -1 && expectedCount <= 0) { + expectedMinimumMessageCount(1); + } + // we just wants to expect to be called once + expects(new AssertionTask() { + @Override + public void assertOnIndex(int i) { + Exchange exchange = getReceivedExchange(i); + assertFalse("Exchange " + i + " has headers", exchange.getIn().hasHeaders()); + } + + public void run() { + for (int i = 0; i < getReceivedExchanges().size(); i++) { + assertOnIndex(i); + } + } + }); + } + /** * Adds an expectation that the given header values are received by this endpoint in any order. * <p/> @@ -1344,7 +1370,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint, } public int getReceivedCounter() { - return counter; + return counter.get(); } public List<Exchange> getReceivedExchanges() { @@ -1574,7 +1600,7 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint, try { if (log) { String line = getComponent().getExchangeFormatter().format(exchange); - LOG.info("mock:{} received #{} -> {}", getName(), counter + 1, line); + LOG.info("mock:{} received #{} -> {}", getName(), counter.get() + 1, line); } if (reporter != null) { @@ -1678,10 +1704,10 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint, // add a copy of the received exchange addReceivedExchange(copy); // and then increment counter after adding received exchange - ++counter; + final int receivedCounter = counter.incrementAndGet(); - Processor processor = processors.get(getReceivedCounter()) != null - ? processors.get(getReceivedCounter()) : defaultProcessor; + Processor processor = processors.get(receivedCounter) != null + ? processors.get(receivedCounter) : defaultProcessor; if (processor != null) { try { @@ -1708,8 +1734,8 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint, receivedExchanges.add(copy); } else { // okay there is some sort of limitations, so figure out what to retain - if (retainFirst > 0 && counter < retainFirst) { - // store a copy as its within the retain first limitation + if (retainFirst > 0 && counter.get() < retainFirst) { + // store a copy as it is within the retain first limitation receivedExchanges.add(copy); } else if (retainLast > 0) { // remove the oldest from the last retained boundary, @@ -1760,12 +1786,32 @@ public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint, } } + /** + * Asserts that the given {@code predicate} is {@code true}, if not an {@code AssertionError} is raised with the + * give message. + * + * @param message the message to use in case of a failure. + * @param predicate the predicate allowing to determinate if it is a failure or not. + */ protected void assertTrue(String message, boolean predicate) { if (!predicate) { fail(message); } } + /** + * Asserts that the given {@code predicate} is {@code false}, if not an {@code AssertionError} is raised with the + * give message. + * + * @param message the message to use in case of a failure. + * @param predicate the predicate allowing to determinate if it is a failure or not. + */ + protected void assertFalse(String message, boolean predicate) { + if (predicate) { + fail(message); + } + } + protected void fail(Object message) { if (LOG.isDebugEnabled()) { List<Exchange> list = getReceivedExchanges();
