Dear community,

We have 2 camel services: camel-A and camel-B and both of them use camel-jms 
component.

Camel-A is the JMS producer with: <to 
uri="jms:queue:queue.name?disableTimeToLive=true&amp;transferExchange=true&amp;exchangePattern=InOnly"/>
Camel-B is the JMS consumer with: <from 
uri="jms:queue:{{amq.artemis.consume.queue.name}}?transferExchange=true&amp;receiveTimeout=100000"/>

After stress testing, we got camel-B OOM exception and found the memory leak 
root cause is DefaultInflightRepository:

  1.  When first time camel-B receive the exchange, it’s added into 
DefaultInflightRepository with a new generated exchange ID
  2.  The exchange.in message is an ObjectMessage because we set 
transferExchange=true. Camel-B JmsBinding called 
DefaultExchangeHolder.unmarshal(exchange, holder) and replaced the exchange id 
with inside payload exchange id
  3.  When camel-B try to remove the exchange from DefaultInfligntRepository, 
it failed because the exchange id has changed and it’s no longer in the key set.

We’re using camel 2.21.0 and below is some source code for reference. Could 
someone help to double confirm whether it’s a known defeat for this version of 
camel? Or is there any error usage in our code? Thanks a lot!

org.apache.camel.component.jms.JmsBinding

/**
     * Extracts the body from the JMS message
     *
     * @param exchange the exchange
     * @param message  the message to extract its body
     * @return the body, can be <tt>null</tt>
     */
    public Object extractBodyFromJms(Exchange exchange, Message message) {
……
            if (message instanceof ObjectMessage) {
                LOG.trace("Extracting body as a ObjectMessage from JMS message: 
{}", message);
                ObjectMessage objectMessage = (ObjectMessage)message;
                Object payload = objectMessage.getObject();
                if (payload instanceof DefaultExchangeHolder) {
                    DefaultExchangeHolder holder = (DefaultExchangeHolder) 
payload;
                    DefaultExchangeHolder.unmarshal(exchange, holder);
                    // enrich with JMS headers also as otherwise they will get 
lost when use the transferExchange option.
                    Map<String, Object> jmsHeaders = 
extractHeadersFromJms(message, exchange);
                    exchange.getIn().getHeaders().putAll(jmsHeaders);
                    return exchange.getIn().getBody();
                } else {
                    return objectMessage.getObject();
                }
            }
……
        } catch (JMSException e) {
            throw new RuntimeCamelException("Failed to extract body due to: " + 
e + ". Message: " + message, e);
        }
    }

org.apache.camel.impl. DefaultExchangeHolder

/**
     * Transfers the information from the payload to the exchange.
     *
     * @param exchange the exchange to set values from the payload, must 
<b>not</b> be <tt>null</tt>
     * @param payload  the payload with the values, must <b>not</b> be 
<tt>null</tt>
     */
    public static void unmarshal(Exchange exchange, DefaultExchangeHolder 
payload) {
        ObjectHelper.notNull(exchange, "exchange");
        ObjectHelper.notNull(payload, "payload");

        exchange.setExchangeId(payload.exchangeId);
        exchange.getIn().setBody(payload.inBody);
        if (payload.inHeaders != null) {
            exchange.getIn().setHeaders(payload.inHeaders);
        }
        if (payload.inFaultFlag != null) {
            exchange.getIn().setFault(payload.inFaultFlag);
        }
        if (payload.outBody != null) {
            exchange.getOut().setBody(payload.outBody);
            if (payload.outHeaders != null) {
                exchange.getOut().setHeaders(payload.outHeaders);
            }
            if (payload.outFaultFlag != null) {
                exchange.getOut().setFault(payload.outFaultFlag);
            }
        }
        if (payload.properties != null) {
            for (String key : payload.properties.keySet()) {
                exchange.setProperty(key, payload.properties.get(key));
            }
        }
        exchange.setException(payload.exception);
    }


Regards,
Jane

Reply via email to