Hi

Yes it sounds like that's a good candidate for the problem.
Camel 2.x is EOL soon and we only fix very serious security issues in 2.x.
So you can patch the code yourself, or do not use the transferExchange
option (that option is also not recommended to use - Java object
serialization is bad).





On Wed, Nov 24, 2021 at 7:23 AM Mi Jane 米静 <[email protected]> wrote:
>
> Dear community,
>
> We have 2 camel services: camel-A and camel-B and both of them use camel-jms 
> component.
>
> Camel-A is the JMS producer with: <to 
> uri="jms:queue:queue.name?disableTimeToLive=true&amp;transferExchange=true&amp;exchangePattern=InOnly"/>
> Camel-B is the JMS consumer with: <from 
> uri="jms:queue:{{amq.artemis.consume.queue.name}}?transferExchange=true&amp;receiveTimeout=100000"/>
>
> After stress testing, we got camel-B OOM exception and found the memory leak 
> root cause is DefaultInflightRepository:
>
>   1.  When first time camel-B receive the exchange, it’s added into 
> DefaultInflightRepository with a new generated exchange ID
>   2.  The exchange.in message is an ObjectMessage because we set 
> transferExchange=true. Camel-B JmsBinding called 
> DefaultExchangeHolder.unmarshal(exchange, holder) and replaced the exchange 
> id with inside payload exchange id
>   3.  When camel-B try to remove the exchange from DefaultInfligntRepository, 
> it failed because the exchange id has changed and it’s no longer in the key 
> set.
>
> We’re using camel 2.21.0 and below is some source code for reference. Could 
> someone help to double confirm whether it’s a known defeat for this version 
> of camel? Or is there any error usage in our code? Thanks a lot!
>
> org.apache.camel.component.jms.JmsBinding
>
> /**
>      * Extracts the body from the JMS message
>      *
>      * @param exchange the exchange
>      * @param message  the message to extract its body
>      * @return the body, can be <tt>null</tt>
>      */
>     public Object extractBodyFromJms(Exchange exchange, Message message) {
> ……
>             if (message instanceof ObjectMessage) {
>                 LOG.trace("Extracting body as a ObjectMessage from JMS 
> message: {}", message);
>                 ObjectMessage objectMessage = (ObjectMessage)message;
>                 Object payload = objectMessage.getObject();
>                 if (payload instanceof DefaultExchangeHolder) {
>                     DefaultExchangeHolder holder = (DefaultExchangeHolder) 
> payload;
>                     DefaultExchangeHolder.unmarshal(exchange, holder);
>                     // enrich with JMS headers also as otherwise they will 
> get lost when use the transferExchange option.
>                     Map<String, Object> jmsHeaders = 
> extractHeadersFromJms(message, exchange);
>                     exchange.getIn().getHeaders().putAll(jmsHeaders);
>                     return exchange.getIn().getBody();
>                 } else {
>                     return objectMessage.getObject();
>                 }
>             }
> ……
>         } catch (JMSException e) {
>             throw new RuntimeCamelException("Failed to extract body due to: " 
> + e + ". Message: " + message, e);
>         }
>     }
>
> org.apache.camel.impl. DefaultExchangeHolder
>
> /**
>      * Transfers the information from the payload to the exchange.
>      *
>      * @param exchange the exchange to set values from the payload, must 
> <b>not</b> be <tt>null</tt>
>      * @param payload  the payload with the values, must <b>not</b> be 
> <tt>null</tt>
>      */
>     public static void unmarshal(Exchange exchange, DefaultExchangeHolder 
> payload) {
>         ObjectHelper.notNull(exchange, "exchange");
>         ObjectHelper.notNull(payload, "payload");
>
>         exchange.setExchangeId(payload.exchangeId);
>         exchange.getIn().setBody(payload.inBody);
>         if (payload.inHeaders != null) {
>             exchange.getIn().setHeaders(payload.inHeaders);
>         }
>         if (payload.inFaultFlag != null) {
>             exchange.getIn().setFault(payload.inFaultFlag);
>         }
>         if (payload.outBody != null) {
>             exchange.getOut().setBody(payload.outBody);
>             if (payload.outHeaders != null) {
>                 exchange.getOut().setHeaders(payload.outHeaders);
>             }
>             if (payload.outFaultFlag != null) {
>                 exchange.getOut().setFault(payload.outFaultFlag);
>             }
>         }
>         if (payload.properties != null) {
>             for (String key : payload.properties.keySet()) {
>                 exchange.setProperty(key, payload.properties.get(key));
>             }
>         }
>         exchange.setException(payload.exception);
>     }
>
>
> Regards,
> Jane
>


-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2

Reply via email to