Dear community,
We have 2 camel services: camel-A and camel-B and both of them use camel-jms
component.
Camel-A is the JMS producer with: <to
uri="jms:queue:queue.name?disableTimeToLive=true&transferExchange=true&exchangePattern=InOnly"/>
Camel-B is the JMS consumer with: <from
uri="jms:queue:{{amq.artemis.consume.queue.name}}?transferExchange=true&receiveTimeout=100000"/>
After stress testing, we got camel-B OOM exception and found the memory leak
root cause is DefaultInflightRepository:
1. When first time camel-B receive the exchange, it’s added into
DefaultInflightRepository with a new generated exchange ID
2. The exchange.in message is an ObjectMessage because we set
transferExchange=true. Camel-B JmsBinding called
DefaultExchangeHolder.unmarshal(exchange, holder) and replaced the exchange id
with inside payload exchange id
3. When camel-B try to remove the exchange from DefaultInfligntRepository,
it failed because the exchange id has changed and it’s no longer in the key set.
We’re using camel 2.21.0 and below is some source code for reference. Could
someone help to double confirm whether it’s a known defeat for this version of
camel? Or is there any error usage in our code? Thanks a lot!
org.apache.camel.component.jms.JmsBinding
/**
* Extracts the body from the JMS message
*
* @param exchange the exchange
* @param message the message to extract its body
* @return the body, can be <tt>null</tt>
*/
public Object extractBodyFromJms(Exchange exchange, Message message) {
……
if (message instanceof ObjectMessage) {
LOG.trace("Extracting body as a ObjectMessage from JMS message:
{}", message);
ObjectMessage objectMessage = (ObjectMessage)message;
Object payload = objectMessage.getObject();
if (payload instanceof DefaultExchangeHolder) {
DefaultExchangeHolder holder = (DefaultExchangeHolder)
payload;
DefaultExchangeHolder.unmarshal(exchange, holder);
// enrich with JMS headers also as otherwise they will get
lost when use the transferExchange option.
Map<String, Object> jmsHeaders =
extractHeadersFromJms(message, exchange);
exchange.getIn().getHeaders().putAll(jmsHeaders);
return exchange.getIn().getBody();
} else {
return objectMessage.getObject();
}
}
……
} catch (JMSException e) {
throw new RuntimeCamelException("Failed to extract body due to: " +
e + ". Message: " + message, e);
}
}
org.apache.camel.impl. DefaultExchangeHolder
/**
* Transfers the information from the payload to the exchange.
*
* @param exchange the exchange to set values from the payload, must
<b>not</b> be <tt>null</tt>
* @param payload the payload with the values, must <b>not</b> be
<tt>null</tt>
*/
public static void unmarshal(Exchange exchange, DefaultExchangeHolder
payload) {
ObjectHelper.notNull(exchange, "exchange");
ObjectHelper.notNull(payload, "payload");
exchange.setExchangeId(payload.exchangeId);
exchange.getIn().setBody(payload.inBody);
if (payload.inHeaders != null) {
exchange.getIn().setHeaders(payload.inHeaders);
}
if (payload.inFaultFlag != null) {
exchange.getIn().setFault(payload.inFaultFlag);
}
if (payload.outBody != null) {
exchange.getOut().setBody(payload.outBody);
if (payload.outHeaders != null) {
exchange.getOut().setHeaders(payload.outHeaders);
}
if (payload.outFaultFlag != null) {
exchange.getOut().setFault(payload.outFaultFlag);
}
}
if (payload.properties != null) {
for (String key : payload.properties.keySet()) {
exchange.setProperty(key, payload.properties.get(key));
}
}
exchange.setException(payload.exception);
}
Regards,
Jane