This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit cce62c2ce9977eeeb95305dd1611e07a566b0624 Author: Otavio Rodolfo Piske <angusyo...@gmail.com> AuthorDate: Fri Jul 14 14:27:27 2023 +0200 (chores) camel-core: RecipientListWithSimpleExpressionTest test fixes and cleanups - removed unrelated test - cleaned up the code - isolate the test, as it requires more resources than the rest of the tests - improve overall coordination of concurrent parties --- .../RecipientListWithSimpleExpressionTest.java | 109 +++++++-------------- 1 file changed, 36 insertions(+), 73 deletions(-) diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListWithSimpleExpressionTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListWithSimpleExpressionTest.java index fbda46e7a9a..acf289640f2 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListWithSimpleExpressionTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListWithSimpleExpressionTest.java @@ -16,108 +16,71 @@ */ package org.apache.camel.processor; -import java.util.concurrent.ExecutorService; +import java.time.Duration; import java.util.concurrent.Executors; +import java.util.concurrent.Phaser; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.camel.ContextTestSupport; -import org.apache.camel.Header; import org.apache.camel.builder.RouteBuilder; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Isolated; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +@Isolated("This test creates a larger thread pool, which may be too much on slower hosts") public class RecipientListWithSimpleExpressionTest extends ContextTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(RecipientListWithSimpleExpressionTest.class); + private final ScheduledExecutorService executors = Executors.newScheduledThreadPool(10); + private final Phaser phaser = new Phaser(50); @Override - public boolean isUseRouteBuilder() { - return false; - } - - @Test - public void testRecipientList() throws Exception { - context.addRoutes(new RouteBuilder() { + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { @Override - public void configure() throws Exception { + public void configure() { from("direct:start").recipientList(simple("mock:${in.header.queue}")); } - }); - context.start(); - template.start(); + }; + } - for (int i = 0; i < 10; i++) { - getMockEndpoint("mock:" + i).expectedMessageCount(50); - } + @BeforeEach + void sendMessages() { + // it may take a little while for the context to start on slower hosts + Awaitility.await().atMost(Duration.ofSeconds(2)).until(() -> context.getUptimeMillis() > 1000); // use concurrent producers to send a lot of messages - ExecutorService executors = Executors.newFixedThreadPool(10); for (int i = 0; i < 50; i++) { - executors.execute(new Runnable() { + final Runnable runOverRunnable = new Runnable() { + int i; + + @Override public void run() { - for (int i = 0; i < 10; i++) { - try { - template.sendBodyAndHeader("direct:start", "Hello " + i, "queue", i); - Thread.sleep(5); - } catch (Exception e) { - // ignore - } + template.sendBodyAndHeader("direct:start", "Hello " + i, "queue", i); + i++; + if (i == 10) { + i = 0; } } - }); + }; + executors.scheduleAtFixedRate(runOverRunnable, 0, 50, TimeUnit.MILLISECONDS); + phaser.arrive(); } - - assertMockEndpointsSatisfied(); - executors.shutdownNow(); } - public static class MyBeanRouter { - - @org.apache.camel.RecipientList - public String route(@Header("queue") String queue) { - return "mock:" + queue; - } - } @Test - public void testStatic() throws Exception { - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - from("direct:0").to("mock:0"); - from("direct:1").to("mock:1"); - from("direct:2").to("mock:2"); - from("direct:3").to("mock:3"); - from("direct:4").to("mock:4"); - from("direct:5").to("mock:5"); - from("direct:6").to("mock:6"); - from("direct:7").to("mock:7"); - from("direct:8").to("mock:8"); - from("direct:9").to("mock:9"); - } - }); - context.start(); - template.start(); - + public void testRecipientList() throws InterruptedException, TimeoutException { for (int i = 0; i < 10; i++) { getMockEndpoint("mock:" + i).expectedMessageCount(50); } - // use concurrent producers to send a lot of messages - ExecutorService executors = Executors.newFixedThreadPool(10); - for (int i = 0; i < 50; i++) { - executors.execute(new Runnable() { - public void run() { - for (int i = 0; i < 10; i++) { - try { - template.sendBodyAndHeader("direct:" + i, "Hello " + i, "queue", i); - Thread.sleep(5); - } catch (Exception e) { - // ignore - } - } - } - }); - } - + phaser.awaitAdvanceInterruptibly(0, 5000, TimeUnit.SECONDS); assertMockEndpointsSatisfied(); executors.shutdownNow(); } - }