Brad Harvey created CAMEL-15333: ----------------------------------- Summary: SJMS transacted producer can lose messages Key: CAMEL-15333 URL: https://issues.apache.org/jira/browse/CAMEL-15333 Project: Camel Issue Type: Bug Components: camel-sjms Affects Versions: 3.4.1 Reporter: Brad Harvey
Pre-reqs: SJMS producer with transacted=true, and not linked to a shared session (ReturnProducerCallback chosen in SjmsProducer#process) Sequence of events in method InOnlyProducer.sendMessage: # message is sent successfully by producer.getMessageProducer().send(message); # producer is returned to pool by releaseProducerCallback.release(producer); {code:java} @Override public void sendMessage(final Exchange exchange, final AsyncCallback callback, final MessageProducerResources producer, final ReleaseProducerCallback releaseProducerCallback) throws Exception { try { Message message = getEndpoint().getBinding().makeJmsMessage(exchange, producer.getSession()); producer.getMessageProducer().send(message); } catch (Exception e) { exchange.setException(new CamelExchangeException("Unable to complete sending the JMS message", exchange, e)); } finally { releaseProducerCallback.release(producer); callback.done(isSynchronous()); } } {code} When the producer is returned to the pool, the pool may decide that it should be disposed. See GenericObjectPool#addToObjectPool. The variable shouldDestroy can be set to true for a few reasons, but in my case it was because there were too many idle producers in the pool (_pool.size() >= _maxIdle). When it is destroyed, SjmsProducer$MessageProducerResourcesFactory#destroyObject is called. If the session is transacted, then this method will roll it back. {code:java} @Override public void destroyObject(MessageProducerResources model) throws Exception { if (model.getMessageProducer() != null) { model.getMessageProducer().close(); } if (model.getSession() != null) { try { if (model.getSession().getTransacted()) { try { model.getSession().rollback(); } catch (Exception e) { // Do nothing. Just make sure we are cleaned up } } model.getSession().close(); } catch (Exception e) { // TODO why is the session closed already? } } } {code} There was only a subtle indication that this happened in the log, with some logging from the JMS client library (qpid in this case) at debug level. {code:java} 2020-07-22 11:34:48.259 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms.SjmsProducer : Processing Exchange.id:ID-EXCHANGE-1595392487340-0-3 2020-07-22 11:34:48.260 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms2.Sjms2Endpoint : Creating ConnectionResource with connectionCount: 1 using ConnectionFactory: org.springframework.jms.connection.CachingConnectionFactory@2db98e22 2020-07-22 11:34:49.158 DEBUG||| 27108 --- [AmqpProvider :(1):[amqps://localhost:5672]] o.a.q.j.p.a.AmqpTransactionCoordinator : New TX started: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.158 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext : Begin: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.158 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] o.s.j.c.CachingConnectionFactory : Registering cached JMS Session for mode 0: org.apache.qpid.jms.JmsSession@322f173f 2020-07-22 11:34:49.173 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] o.s.j.c.CachingConnectionFactory : Registering cached JMS MessageProducer for destination [queuename]: org.apache.qpid.jms.JmsMessageProducer@24ff95c9 2020-07-22 11:34:49.175 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms.SjmsProducer : Sending message synchronously: <?xml version="1.0" encoding="UTF-8"?> (snip) 2020-07-22 11:34:49.201 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext : Rollback: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.207 DEBUG||| 27108 --- [AmqpProvider :(1):[amqps://localhost:5672]] o.a.q.j.p.a.AmqpTransactionCoordinator : New TX started: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4 2020-07-22 11:34:49.367 DEBUG||| 27108 --- [AmqpProvider :(1):[amqps://localhost:5672]] o.a.q.j.p.a.AmqpTransactionCoordinator : Last TX request succeeded: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2 2020-07-22 11:34:49.367 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] o.a.camel.component.sjms.SjmsProducer : Processing Exchange.id:ID-EXCHANGE-1595392487340-0-3 - SUCCESS 2020-07-22 11:34:49.371 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] .c.s.t.SessionTransactionSynchronization : Processing completion of ExchangeId: ID-EXCHANGE-1595392487340-0-3 2020-07-22 11:34:49.371 DEBUG|mdc.routename|ID-EXCHANGE-1595392487340-0-3| 27108 --- [Camel (camel-1) thread #15 - SjmsBatchConsumer] o.a.qpid.jms.JmsLocalTransactionContext : Commit: TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4 {code} Even the subsequent session.commit() in SessionTransactionSynchronization#onComplete did not fail - I think a new one was started (TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:4) when the first one was rolled back (TX:ID:3ab193ce-faeb-42a2-ab39-1e7a92d1cc58:1:2). The end result of this is that InOnlyProducer.sendMessage completes successfully without setting any exception on the exchange, but the message has been rolled back and not sent. It is lost. -- This message was sent by Atlassian Jira (v8.3.4#803005)