Zdeněk Obst created CAMEL-9606:
----------------------------------
Summary: SJMS Consumer-Producer in transaciton
Key: CAMEL-9606
URL: https://issues.apache.org/jira/browse/CAMEL-9606
Project: Camel
Issue Type: Bug
Components: camel-sjms
Affects Versions: 2.16.2, 2.15.4
Reporter: Zdeněk Obst
I'm not 100% sure this is a bug but it feels that way from conversation I had
via mailing lists.
I'm trying to ensure transactional processing between SJMS consumer and
producer (e.g. using same JMS session).
In other words this simple case:
1. prepare higher amount of JMS messages in broker (e.g. 1000)
2. use Camel route from input queue to output queue using trasacted=true
3. start context (starts consuming messages) and in any time kill java process
When I kill process, I would expect that sum of messages in input and output
queue will be 1000 - so the transaction works. But what happens that I always
end up with 1001+ messages. Maybe it is misconfiguration or misunderstanding
how SJMS can work.
Here is the sample code I used for reproduction (using ActiveMQ):
{code:java}
public class SjmsTransaction {
public static void main(String[] args) throws Exception {
RouteBuilder rb = new RouteBuilder() {
@Override
public void configure() throws Exception {
onException(Exception.class)
.process(systemOut("Exception!!"));
from("sjms:queue:test-in?transacted=true&consumerCount=5")
.process(systemOut("Processing"))
.to("sjms:queue:test-out?transacted=true")
.process(systemOut("Processed"));
}
};
CamelContext context = new DefaultCamelContext();
addJmsComponent(context);
context.addRoutes(rb);
System.out.println("=====> Starting context");
context.start();
// Now the context will run and consume messages, when I kill
application by force in any time
// I expect this to be true: <#messagesInInputAtBeginning> ==
<#messagesInInputNow> + <#messagesInOutputNow>
// What happens is that there is always < (e.g. I submitted 1000
messages, out has 500, in has 501)
}
private static void addJmsComponent(CamelContext context) {
ConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
ConnectionFactoryResource connResource = new
ConnectionFactoryResource(5, factory);
SjmsComponent comp = new SjmsComponent();
comp.setConnectionResource(connResource);
context.addComponent("sjms", comp);
}
private static Processor systemOut(final String message) {
return new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(exchange.getExchangeId() + ": " + message);
}
};
}
}
{code}
Note that I tried to use it with various combinations of acknowledgeMode and
In/InOut exchange pattern - but without luck.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)