Messages dropped with ActiveMQ/JMS component and Resequencer pattern 
---------------------------------------------------------------------

                 Key: CAMEL-1201
                 URL: https://issues.apache.org/activemq/browse/CAMEL-1201
             Project: Apache Camel
          Issue Type: Bug
          Components: camel-activemq, camel-jms, eip
    Affects Versions: 1.5.0
            Reporter: Bruce Snyder
            Assignee: Hadrian Zbarcea


I've created a simple class to test the resequencer as listed below: 

{code}
public final class MyCamelResequencer {
    private static final Logger LOG = 
Logger.getLogger(MyCamelResequencer.class);

    private MyCamelResequencer() {}

    public static void main(String args[]) throws Exception {
        CamelContext context = new DefaultCamelContext();
        Tracer tracer = new Tracer();
        tracer.getFormatter().setShowBreadCrumb(false);
        tracer.getFormatter().setShowNode(true);
        context.addInterceptStrategy(tracer);
       
        String brokerUrl = "vm://localhost?broker.persistent=false";
//        String brokerUrl = "tcp://localhost:61616";
        context.addComponent("activemq", 
ActiveMQComponent.activeMQComponent(brokerUrl));
        context.addRoutes(new RouteBuilder() {
            public void configure() {
//                from("seda:FOO").to("log:PLANETS");
//                
from("seda:FOO").resequencer(header("seqnum")).to("log:PLANETS");
//                from("activemq:TEST.IN").to("log:PLANETS");
                
from("activemq:TEST.IN").resequencer(header("seqnum")).to("log:PLANETS");
//                
from("activemq:TEST.IN").resequencer(header("seq")).to("activemq:TEST.OUT");
            }
        });

        ProducerTemplate template = context.createProducerTemplate();

        context.start();

        List<String> planets = new ArrayList(8);
        planets.add("Mercury");
        planets.add("Venus");
        planets.add("Earth");
        planets.add("Mars");
        planets.add("Jupiter");
        planets.add("Saturn");
        planets.add("Uranus");
        planets.add("Neptune");
        
        List<Integer> numbers = new ArrayList<Integer>(8);
        numbers.add(1);
        numbers.add(2);
        numbers.add(3);
        numbers.add(4);
        numbers.add(5);
        numbers.add(6);
        numbers.add(7);
        numbers.add(8);
        
        for (String planet: planets) {
            Collections.shuffle(numbers);
            int seqnum = numbers.remove(0);
            String message = "I am the planet " + planet + " with seqnum: " + 
seqnum;
            LOG.info("Sending message: " + message);
            template.sendBodyAndHeader("activemq:TEST.IN", message, "seqnum", 
seqnum);
//            template.sendBodyAndHeader("seda:FOO", message, "seqnum", seqnum);
        }

        Thread.sleep(10000);
        LOG.info("Shutting down the Camel context");
        context.stop();
        System.exit(1);
    }
{code}

The following are my observations: 

* SEDA -> log receives all messages 
* SEDA -> resequencer -> log receives all messages 
* ActiveMQ -> log receives all messages 
* ActiveMQ -> resequencer -> log does not receive all messages 

Upon enabling the Tracer interceptor, I'm able to see that it doesn't even 
receive all messages from the ActiveMQ component: 

{panel}
[                          main] DefaultCamelContext            INFO  JMX 
enabled. Using InstrumentationLifecycleStrategy.
[                          main] BrokerService                  INFO  Using 
Persistence Adapter: MemoryPersistenceAdapter
[                          main] BrokerService                  INFO  ActiveMQ 
null JMS Message Broker (localhost) is starting
[                          main] BrokerService                  INFO  For help 
or more information please see: http://activemq.apache.org/
[                 JMX connector] ManagementContext              WARN  Failed to 
start jmx connector: Cannot bind to URL [rmi://localhost:1099/jmxrmi]: 
javax.naming.NameAlreadyBoundException: jmxrmi [Root exception is 
java.rmi.AlreadyBoundException: jmxrmi]
[                          main] BrokerService                  INFO  ActiveMQ 
JMS Message Broker (localhost, 
ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) started
[                          main] TransportConnector             INFO  Connector 
vm://localhost Started
[                          main] MyCamelResequencer             INFO  Sending 
message: I am the planet Mercury with seqnum: 1
[                          main] MyCamelResequencer             INFO  Sending 
message: I am the planet Venus with seqnum: 3
[                          main] MyCamelResequencer             INFO  Sending 
message: I am the planet Earth with seqnum: 6
[                          main] MyCamelResequencer             INFO  Sending 
message: I am the planet Mars with seqnum: 2
[                          main] MyCamelResequencer             INFO  Sending 
message: I am the planet Jupiter with seqnum: 4
[                          main] MyCamelResequencer             INFO  Sending 
message: I am the planet Saturn with seqnum: 8
[                          main] MyCamelResequencer             INFO  Sending 
message: I am the planet Uranus with seqnum: 7
[                          main] MyCamelResequencer             INFO  Sending 
message: I am the planet Neptune with seqnum: 5
[aultMessageListenerContainer-1] TraceInterceptor               INFO  -> 
interceptor1 
Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ 
[] -> [To[log:PLANETS]]]]), 
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], 
RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} 
Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871918, 
JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=1, 
JMSXGroupID=null, 
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:1, 
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
BodyType:String Body:I am the planet Mercury with seqnum: 1
[aultMessageListenerContainer-1] TraceInterceptor               INFO  -> 
resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly 
Properties:{CamelCauseException=null} Headers:{JMSDestination=queue://TEST.IN, 
JMSTimestamp=1229973871918, JMSRedelivered=false, JMSExpiration=0, 
JMSType=null, seqnum=1, JMSXGroupID=null, 
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:1, 
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
BodyType:String Body:I am the planet Mercury with seqnum: 1
[aultMessageListenerContainer-2] TraceInterceptor               INFO  -> 
interceptor1 
Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ 
[] -> [To[log:PLANETS]]]]), 
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], 
RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} 
Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871930, 
JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=6, 
JMSXGroupID=null, 
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:3, 
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
BodyType:String Body:I am the planet Earth with seqnum: 6
[aultMessageListenerContainer-2] TraceInterceptor               INFO  -> 
resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly 
Properties:{CamelCauseException=null} Headers:{JMSDestination=queue://TEST.IN, 
JMSTimestamp=1229973871930, JMSRedelivered=false, JMSExpiration=0, 
JMSType=null, seqnum=6, JMSXGroupID=null, 
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:3, 
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
BodyType:String Body:I am the planet Earth with seqnum: 6
[aultMessageListenerContainer-3] TraceInterceptor               INFO  -> 
interceptor1 
Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ 
[] -> [To[log:PLANETS]]]]), 
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], 
RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} 
Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871930, 
JMSRedelivered=true, JMSExpiration=0, JMSType=null, seqnum=2, JMSXGroupID=null, 
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:4, 
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
BodyType:String Body:I am the planet Mars with seqnum: 2
[aultMessageListenerContainer-3] TraceInterceptor               INFO  -> 
resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly 
Properties:{CamelCauseException=null} Headers:{JMSDestination=queue://TEST.IN, 
JMSTimestamp=1229973871930, JMSRedelivered=true, JMSExpiration=0, JMSType=null, 
seqnum=2, JMSXGroupID=null, 
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:4, 
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
BodyType:String Body:I am the planet Mars with seqnum: 2
[aultMessageListenerContainer-4] TraceInterceptor               INFO  -> 
interceptor1 
Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ 
[] -> [To[log:PLANETS]]]]), 
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], 
RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} 
Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871936, 
JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=4, 
JMSXGroupID=null, 
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:5, 
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
BodyType:String Body:I am the planet Jupiter with seqnum: 4
[aultMessageListenerContainer-4] TraceInterceptor               INFO  -> 
resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly 
Properties:{CamelCauseException=null} Headers:{JMSDestination=queue://TEST.IN, 
JMSTimestamp=1229973871936, JMSRedelivered=false, JMSExpiration=0, 
JMSType=null, seqnum=4, JMSXGroupID=null, 
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:5, 
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
BodyType:String Body:I am the planet Jupiter with seqnum: 4
[aultMessageListenerContainer-5] TraceInterceptor               INFO  -> 
interceptor1 
Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ 
[] -> [To[log:PLANETS]]]]), 
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], 
RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} 
Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871938, 
JMSRedelivered=false, JMSExpiration=0, JMSType=null, seqnum=7, 
JMSXGroupID=null, 
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:7, 
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
BodyType:String Body:I am the planet Uranus with seqnum: 7
[aultMessageListenerContainer-5] TraceInterceptor               INFO  -> 
resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly 
Properties:{CamelCauseException=null} Headers:{JMSDestination=queue://TEST.IN, 
JMSTimestamp=1229973871938, JMSRedelivered=false, JMSExpiration=0, 
JMSType=null, seqnum=7, JMSXGroupID=null, 
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:7, 
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
BodyType:String Body:I am the planet Uranus with seqnum: 7
[aultMessageListenerContainer-6] TraceInterceptor               INFO  -> 
interceptor1 
Interceptor[Delegate(DeadLetterChannel[Delegate(TraceInterceptor[Resequencer[ 
[] -> [To[log:PLANETS]]]]), 
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error], 
RedeliveryPolicy[maximumRedeliveries=6]])] InOnly Properties:{} 
Headers:{JMSDestination=queue://TEST.IN, JMSTimestamp=1229973871938, 
JMSRedelivered=true, JMSExpiration=0, JMSType=null, seqnum=5, JMSXGroupID=null, 
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:8, 
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
BodyType:String Body:I am the planet Neptune with seqnum: 5
[aultMessageListenerContainer-6] TraceInterceptor               INFO  -> 
resequencer1 Resequencer[ [] -> [To[log:PLANETS]]] InOnly 
Properties:{CamelCauseException=null} Headers:{JMSDestination=queue://TEST.IN, 
JMSTimestamp=1229973871938, JMSRedelivered=true, JMSExpiration=0, JMSType=null, 
seqnum=5, JMSXGroupID=null, 
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:8, 
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
BodyType:String Body:I am the planet Neptune with seqnum: 5
[[log:PLANETS]]] Polling Thread] TraceInterceptor               INFO  -> to1 
To[log:PLANETS] InOnly Properties:{} Headers:{JMSDestination=queue://TEST.IN, 
JMSTimestamp=1229973871929, JMSRedelivered=false, JMSExpiration=0, 
JMSType=null, seqnum=3, JMSXGroupID=null, 
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:2, 
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
BodyType:String Body:I am the planet Venus with seqnum: 3
[[log:PLANETS]]] Polling Thread] PLANETS                        INFO  
Exchange[BodyType:String, Body:I am the planet Venus with seqnum: 3]
[[log:PLANETS]]] Polling Thread] TraceInterceptor               INFO  -> to1 
To[log:PLANETS] InOnly Properties:{} Headers:{JMSDestination=queue://TEST.IN, 
JMSTimestamp=1229973871937, JMSRedelivered=false, JMSExpiration=0, 
JMSType=null, seqnum=8, JMSXGroupID=null, 
JMSMessageID=ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-2:1:1:1:6, 
JMSDeliveryMode=2, JMSCorrelationID=null, JMSReplyTo=null, JMSPriority=4} 
BodyType:String Body:I am the planet Saturn with seqnum: 8
[[log:PLANETS]]] Polling Thread] PLANETS                        INFO  
Exchange[BodyType:String, Body:I am the planet Saturn with seqnum: 8]
[                          main] MyCamelResequencer             INFO  Shutting 
down the Camel context
[         ActiveMQ ShutdownHook] BrokerService                  INFO  ActiveMQ 
Message Broker (localhost, 
ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) is shutting down
[         ActiveMQ ShutdownHook] TransportConnector             INFO  Connector 
vm://localhost Stopped
[         ActiveMQ ShutdownHook] BrokerService                  INFO  ActiveMQ 
JMS Message Broker (localhost, 
ID:174-155-37-223.pools.spcsdns.net-58792-1229973871555-0:0) stopped

{panel}

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to