Camel versions tested: 2.16 - 2.18.3
Current Maven dependencies:
org.apache.camel:camel-test 2.18.1
org.apache.camel:camel-sjms 2.17.3

SHORT VERSION:
When using SJMS (et al?) with an aggregator and/or splitter in the route,
shutting down either throws an error or tosses out messages.


LONG VERSION:

When using an SJMS consumer to consume from a queue, with a route that has
an aggregator in it, I inevitably lose messages when the route stops.

The two obvious documented aggregator modifiers do not work:

*forceCompletionOnStop* - results in a RejectedExecutionException error
because the underlying thread pools are stopped/closed before the
"prepareShutdown" method is called on the aggregator (which is when the
outstanding aggregations are forced to complete and the results are handed
to the route for processing).

*completeAllOnStop* - results in the route logging the number of outstanding
messages every second (the number never changes) until the (500 second?)
timeout is reached, at which point the route is forced to shut down and the
messages are tossed out. Presumably because there is no active thread pool
available to handle the messages.

Without either of these two modifiers on the aggregator, it just tosses out
any unfinished aggregations on shutdown.

Here's a sample test... It probably isn't ideally written, but it does
illustrate the issue...


import org.apache.activemq.junit.EmbeddedActiveMQBroker;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.sjms.SjmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.util.toolbox.AggregationStrategies;
import org.junit.Rule;
import org.junit.Test;

/**
 * Created by bryan.love on 4/25/17.
 */
public class SjmsBatchTest {
    @Rule
    public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker();
    CamelContext context = new DefaultCamelContext();
    ProducerTemplate template = context.createProducerTemplate();

    @Test
    public void testBatch() throws Exception {
        SjmsComponent comp = new SjmsComponent();
        comp.setConnectionFactory(broker.createConnectionFactory());
        context.addComponent("sjms", comp);
        //context.setShutdownStrategy(new MyShutdownStrategy(context));

        RouteBuilder rb = new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("sjms:queue:test-in")
                .aggregate(header("CamelFileName"),
AggregationStrategies.groupedExchange())
                    .id("fileNameAggProcessor")
                    .completionInterval(10000)   // wait $b
                    .completionSize(50)          // wait for $batchSize
messages to aggregate
                    .forceCompletionOnStop()
                .filter(header("CamelFileName").isNotNull())
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception
{
                        System.out.println("foo");
                    }
                });
            }
        };
        context.addRoutes(rb);

        context.start();
        template.setDefaultEndpointUri("sjms:queue:test-in");
        template.sendBodyAndHeader("some body", "CamelFileName",
"someFileName");
        Thread.sleep(1000);
        context.stop();
    }
}




--
View this message in context: 
http://camel.465427.n5.nabble.com/SJMS-RejectedExecutionException-bug-during-shutdown-tp5798335.html
Sent from the Camel Development mailing list archive at Nabble.com.

Reply via email to