Dear community, We have 2 camel services: camel-A and camel-B and both of them use camel-jms component.
Camel-A is the JMS producer with: <to uri="jms:queue:queue.name?disableTimeToLive=true&transferExchange=true&exchangePattern=InOnly"/> Camel-B is the JMS consumer with: <from uri="jms:queue:{{amq.artemis.consume.queue.name}}?transferExchange=true&receiveTimeout=100000"/> After stress testing, we got camel-B OOM exception and found the memory leak root cause is DefaultInflightRepository: 1. When first time camel-B receive the exchange, it’s added into DefaultInflightRepository with a new generated exchange ID 2. The exchange.in message is an ObjectMessage because we set transferExchange=true. Camel-B JmsBinding called DefaultExchangeHolder.unmarshal(exchange, holder) and replaced the exchange id with inside payload exchange id 3. When camel-B try to remove the exchange from DefaultInfligntRepository, it failed because the exchange id has changed and it’s no longer in the key set. We’re using camel 2.21.0 and below is some source code for reference. Could someone help to double confirm whether it’s a known defeat for this version of camel? Or is there any error usage in our code? Thanks a lot! org.apache.camel.component.jms.JmsBinding /** * Extracts the body from the JMS message * * @param exchange the exchange * @param message the message to extract its body * @return the body, can be <tt>null</tt> */ public Object extractBodyFromJms(Exchange exchange, Message message) { …… if (message instanceof ObjectMessage) { LOG.trace("Extracting body as a ObjectMessage from JMS message: {}", message); ObjectMessage objectMessage = (ObjectMessage)message; Object payload = objectMessage.getObject(); if (payload instanceof DefaultExchangeHolder) { DefaultExchangeHolder holder = (DefaultExchangeHolder) payload; DefaultExchangeHolder.unmarshal(exchange, holder); // enrich with JMS headers also as otherwise they will get lost when use the transferExchange option. Map<String, Object> jmsHeaders = extractHeadersFromJms(message, exchange); exchange.getIn().getHeaders().putAll(jmsHeaders); return exchange.getIn().getBody(); } else { return objectMessage.getObject(); } } …… } catch (JMSException e) { throw new RuntimeCamelException("Failed to extract body due to: " + e + ". Message: " + message, e); } } org.apache.camel.impl. DefaultExchangeHolder /** * Transfers the information from the payload to the exchange. * * @param exchange the exchange to set values from the payload, must <b>not</b> be <tt>null</tt> * @param payload the payload with the values, must <b>not</b> be <tt>null</tt> */ public static void unmarshal(Exchange exchange, DefaultExchangeHolder payload) { ObjectHelper.notNull(exchange, "exchange"); ObjectHelper.notNull(payload, "payload"); exchange.setExchangeId(payload.exchangeId); exchange.getIn().setBody(payload.inBody); if (payload.inHeaders != null) { exchange.getIn().setHeaders(payload.inHeaders); } if (payload.inFaultFlag != null) { exchange.getIn().setFault(payload.inFaultFlag); } if (payload.outBody != null) { exchange.getOut().setBody(payload.outBody); if (payload.outHeaders != null) { exchange.getOut().setHeaders(payload.outHeaders); } if (payload.outFaultFlag != null) { exchange.getOut().setFault(payload.outFaultFlag); } } if (payload.properties != null) { for (String key : payload.properties.keySet()) { exchange.setProperty(key, payload.properties.get(key)); } } exchange.setException(payload.exception); } Regards, Jane