Messages dropped with ActiveMQ/JMS component and Resequencer pattern
---------------------------------------------------------------------
Key: CAMEL-1201
URL: https://issues.apache.org/activemq/browse/CAMEL-1201
Project: Apache Camel
Issue Type: Bug
Components: camel-activemq, camel-jms, eip
Affects Versions: 1.5.0
Reporter: Bruce Snyder
Assignee: Hadrian Zbarcea
I've created a simple class to test the resequencer as listed below:
{code}
public final class MyCamelResequencer {
private static final Logger LOG =
Logger.getLogger(MyCamelResequencer.class);
private MyCamelResequencer() {}
public static void main(String args[]) throws Exception {
CamelContext context = new DefaultCamelContext();
Tracer tracer = new Tracer();
tracer.getFormatter().setShowBreadCrumb(false);
tracer.getFormatter().setShowNode(true);
context.addInterceptStrategy(tracer);
String brokerUrl = "vm://localhost?broker.persistent=false";
// String brokerUrl = "tcp://localhost:61616";
context.addComponent("activemq",
ActiveMQComponent.activeMQComponent(brokerUrl));
context.addRoutes(new RouteBuilder() {
public void configure() {
// from("seda:FOO").to("log:PLANETS");
//
from("seda:FOO").resequencer(header("seqnum")).to("log:PLANETS");
// from("activemq:TEST.IN").to("log:PLANETS");
from("activemq:TEST.IN").resequencer(header("seqnum")).to("log:PLANETS");
//
from("activemq:TEST.IN").resequencer(header("seq")).to("activemq:TEST.OUT");
}
});
ProducerTemplate template = context.createProducerTemplate();
context.start();
List<String> planets = new ArrayList(8);
planets.add("Mercury");
planets.add("Venus");
planets.add("Earth");
planets.add("Mars");
planets.add("Jupiter");
planets.add("Saturn");
planets.add("Uranus");
planets.add("Neptune");
List<Integer> numbers = new ArrayList<Integer>(8);
numbers.add(1);
numbers.add(2);
numbers.add(3);
numbers.add(4);
numbers.add(5);
numbers.add(6);
numbers.add(7);
numbers.add(8);
for (String planet: planets) {
Collections.shuffle(numbers);
int seqnum = numbers.remove(0);
String message = "I am the planet " + planet + " with seqnum: " +
seqnum;
LOG.info("Sending message: " + message);
template.sendBodyAndHeader("activemq:TEST.IN", message, "seqnum",
seqnum);
// template.sendBodyAndHeader("seda:FOO", message, "seqnum", seqnum);
}
Thread.sleep(10000);
LOG.info("Shutting down the Camel context");
context.stop();
System.exit(1);
}
{code}
The following are my observations:
* SEDA -> log receives all messages
* SEDA -> resequencer -> log receives all messages
* ActiveMQ -> log receives all messages
* ActiveMQ -> resequencer -> log does not receive all messages
Upon enabling the Tracer interceptor, I'm able to see that it doesn't even
receive all messages from the ActiveMQ component:
{panel}
[ main] DefaultCamelContext INFO JMX
enabled. Using InstrumentationLifecycleStrategy.
[ main] BrokerService INFO Using
Persistence Adapter: MemoryPersistenceAdapter
[ main] BrokerService INFO ActiveMQ
null JMS Message Broker (localhost) is starting
[ main] BrokerService INFO For help
or more information please see: http://activemq.apache.org/
[ JMX connector] ManagementContext WARN Failed to
start jmx connector: Cannot bind to URL [rmi://localhost:1099/jmxrmi]:
javax.naming.NameAlreadyBoundException: jmxrmi [Root exception is
java.rmi.AlreadyBoundException: jmxrmi]
[ main] BrokerService INFO ActiveMQ
JMS Message Broker (localhost,
ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) started
[ main] TransportConnector INFO Connector
vm://localhost Started
[ main] MyCamelResequencer INFO Sending
message: I am the planet Mercury with seqnum: 1
[ main] MyCamelResequencer INFO Sending
message: I am the planet Venus with seqnum: 3
[ main] MyCamelResequencer INFO Sending
message: I am the planet Earth with seqnum: 6
[ main] MyCamelResequencer INFO Sending
message: I am the planet Mars with seqnum: 2
[ main] MyCamelResequencer INFO Sending
message: I am the planet Jupiter with seqnum: 4
[ main] MyCamelResequencer INFO Sending
message: I am the planet Saturn with seqnum: 8
[ main] MyCamelResequencer INFO Sending
message: I am the planet Uranus with seqnum: 7
[ main] MyCamelResequencer INFO Sending
message: I am the planet Neptune with seqnum: 5
[aultMessageListenerContainer-1] TraceInterceptor INFO ->
interceptor1
Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[
[] -> [To[log:PLANETS]]]]),
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{}
Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871918,
JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=1,
JMSXGroupID=null,
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:1,
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
BodyType:String Body:I am the planet Mercury with seqnum: 1
[aultMessageListenerContainer-1] TraceInterceptor INFO ->
resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly
Properties:{CamelCauseException=null} Headers:{JMSDestination=queue://TEST.IN,
JMSTimestamp=1229973871918, JMSRedelivered=false, JMSExpiration=0,
JMSType=null, seqnum=1, JMSXGroupID=null,
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:1,
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
BodyType:String Body:I am the planet Mercury with seqnum: 1
[aultMessageListenerContainer-2] TraceInterceptor INFO ->
interceptor1
Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[
[] -> [To[log:PLANETS]]]]),
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{}
Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871930,
JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=6,
JMSXGroupID=null,
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:3,
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
BodyType:String Body:I am the planet Earth with seqnum: 6
[aultMessageListenerContainer-2] TraceInterceptor INFO ->
resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly
Properties:{CamelCauseException=null} Headers:{JMSDestination=queue://TEST.IN,
JMSTimestamp=1229973871930, JMSRedelivered=false, JMSExpiration=0,
JMSType=null, seqnum=6, JMSXGroupID=null,
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:3,
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
BodyType:String Body:I am the planet Earth with seqnum: 6
[aultMessageListenerContainer-3] TraceInterceptor INFO ->
interceptor1
Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[
[] -> [To[log:PLANETS]]]]),
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{}
Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871930,
JMSRedelivered=true, JMSExpiration=0, JMSType=null, seqnum=2, JMSXGroupID=null,
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:4,
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
BodyType:String Body:I am the planet Mars with seqnum: 2
[aultMessageListenerContainer-3] TraceInterceptor INFO ->
resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly
Properties:{CamelCauseException=null} Headers:{JMSDestination=queue://TEST.IN,
JMSTimestamp=1229973871930, JMSRedelivered=true, JMSExpiration=0, JMSType=null,
seqnum=2, JMSXGroupID=null,
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:4,
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
BodyType:String Body:I am the planet Mars with seqnum: 2
[aultMessageListenerContainer-4] TraceInterceptor INFO ->
interceptor1
Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[
[] -> [To[log:PLANETS]]]]),
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{}
Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871936,
JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=4,
JMSXGroupID=null,
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:5,
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
BodyType:String Body:I am the planet Jupiter with seqnum: 4
[aultMessageListenerContainer-4] TraceInterceptor INFO ->
resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly
Properties:{CamelCauseException=null} Headers:{JMSDestination=queue://TEST.IN,
JMSTimestamp=1229973871936, JMSRedelivered=false, JMSExpiration=0,
JMSType=null, seqnum=4, JMSXGroupID=null,
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:5,
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
BodyType:String Body:I am the planet Jupiter with seqnum: 4
[aultMessageListenerContainer-5] TraceInterceptor INFO ->
interceptor1
Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[
[] -> [To[log:PLANETS]]]]),
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{}
Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871938,
JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=7,
JMSXGroupID=null,
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:7,
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
BodyType:String Body:I am the planet Uranus with seqnum: 7
[aultMessageListenerContainer-5] TraceInterceptor INFO ->
resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly
Properties:{CamelCauseException=null} Headers:{JMSDestination=queue://TEST.IN,
JMSTimestamp=1229973871938, JMSRedelivered=false, JMSExpiration=0,
JMSType=null, seqnum=7, JMSXGroupID=null,
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:7,
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
BodyType:String Body:I am the planet Uranus with seqnum: 7
[aultMessageListenerContainer-6] TraceInterceptor INFO ->
interceptor1
Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[
[] -> [To[log:PLANETS]]]]),
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{}
Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871938,
JMSRedelivered=true, JMSExpiration=0, JMSType=null, seqnum=5, JMSXGroupID=null,
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:8,
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
BodyType:String Body:I am the planet Neptune with seqnum: 5
[aultMessageListenerContainer-6] TraceInterceptor INFO ->
resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly
Properties:{CamelCauseException=null} Headers:{JMSDestination=queue://TEST.IN,
JMSTimestamp=1229973871938, JMSRedelivered=true, JMSExpiration=0, JMSType=null,
seqnum=5, JMSXGroupID=null,
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:8,
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
BodyType:String Body:I am the planet Neptune with seqnum: 5
[[log:PLANETS]]] Polling Thread] TraceInterceptor INFO -> to1
To[log:PLANETS] InOnly Properties:{} Headers:{JMSDestination=queue://TEST.IN,
JMSTimestamp=1229973871929, JMSRedelivered=false, JMSExpiration=0,
JMSType=null, seqnum=3, JMSXGroupID=null,
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:2,
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
BodyType:String Body:I am the planet Venus with seqnum: 3
[[log:PLANETS]]] Polling Thread] PLANETS INFO
Exchange[BodyType:String, Body:I am the planet Venus with seqnum: 3]
[[log:PLANETS]]] Polling Thread] TraceInterceptor INFO -> to1
To[log:PLANETS] InOnly Properties:{} Headers:{JMSDestination=queue://TEST.IN,
JMSTimestamp=1229973871937, JMSRedelivered=false, JMSExpiration=0,
JMSType=null, seqnum=8, JMSXGroupID=null,
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:6,
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4}
BodyType:String Body:I am the planet Saturn with seqnum: 8
[[log:PLANETS]]] Polling Thread] PLANETS INFO
Exchange[BodyType:String, Body:I am the planet Saturn with seqnum: 8]
[ main] MyCamelResequencer INFO Shutting
down the Camel context
[ ActiveMQ ShutdownHook] BrokerService INFO ActiveMQ
Message Broker (localhost,
ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) is shutting down
[ ActiveMQ ShutdownHook] TransportConnector INFO Connector
vm://localhost Stopped
[ ActiveMQ ShutdownHook] BrokerService INFO ActiveMQ
JMS Message Broker (localhost,
ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) stopped
{panel}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.