Camel versions tested: 2.16 - 2.18.3 Current Maven dependencies: org.apache.camel:camel-test 2.18.1 org.apache.camel:camel-sjms 2.17.3
SHORT VERSION: When using SJMS (et al?) with an aggregator and/or splitter in the route, shutting down either throws an error or tosses out messages. LONG VERSION: When using an SJMS consumer to consume from a queue, with a route that has an aggregator in it, I inevitably lose messages when the route stops. The two obvious documented aggregator modifiers do not work: *forceCompletionOnStop* - results in a RejectedExecutionException error because the underlying thread pools are stopped/closed before the "prepareShutdown" method is called on the aggregator (which is when the outstanding aggregations are forced to complete and the results are handed to the route for processing). *completeAllOnStop* - results in the route logging the number of outstanding messages every second (the number never changes) until the (500 second?) timeout is reached, at which point the route is forced to shut down and the messages are tossed out. Presumably because there is no active thread pool available to handle the messages. Without either of these two modifiers on the aggregator, it just tosses out any unfinished aggregations on shutdown. Here's a sample test... It probably isn't ideally written, but it does illustrate the issue... import org.apache.activemq.junit.EmbeddedActiveMQBroker; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.sjms.SjmsComponent; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.util.toolbox.AggregationStrategies; import org.junit.Rule; import org.junit.Test; /** * Created by bryan.love on 4/25/17. */ public class SjmsBatchTest { @Rule public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker(); CamelContext context = new DefaultCamelContext(); ProducerTemplate template = context.createProducerTemplate(); @Test public void testBatch() throws Exception { SjmsComponent comp = new SjmsComponent(); comp.setConnectionFactory(broker.createConnectionFactory()); context.addComponent("sjms", comp); //context.setShutdownStrategy(new MyShutdownStrategy(context)); RouteBuilder rb = new RouteBuilder() { @Override public void configure() throws Exception { from("sjms:queue:test-in") .aggregate(header("CamelFileName"), AggregationStrategies.groupedExchange()) .id("fileNameAggProcessor") .completionInterval(10000) // wait $b .completionSize(50) // wait for $batchSize messages to aggregate .forceCompletionOnStop() .filter(header("CamelFileName").isNotNull()) .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { System.out.println("foo"); } }); } }; context.addRoutes(rb); context.start(); template.setDefaultEndpointUri("sjms:queue:test-in"); template.sendBodyAndHeader("some body", "CamelFileName", "someFileName"); Thread.sleep(1000); context.stop(); } } -- View this message in context: http://camel.465427.n5.nabble.com/SJMS-RejectedExecutionException-bug-during-shutdown-tp5798335.html Sent from the Camel Development mailing list archive at Nabble.com.