[ 
https://issues.apache.org/jira/browse/CAMEL-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bob Browning updated CAMEL-9143:
--------------------------------
    Affects Version/s:     (was: 2.14.3)

> Producers that implement the ServicePoolAware interface cause memory leak due 
> to JMX references
> -----------------------------------------------------------------------------------------------
>
>                 Key: CAMEL-9143
>                 URL: https://issues.apache.org/jira/browse/CAMEL-9143
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.14.1, 2.14.2, 2.15.0, 2.15.1
>            Reporter: Bob Browning
>
> h4. Description
> Producer instances that implement the ServicePoolAware interface will leak 
> memory if their route is stopped, with new producers being leaked every time 
> the route is started/stopped.
> Known implementations that are affected are RemoteFileProducer (ftp, sftp) 
> and Mina2Producer.
> This is due to the behaviour that the SendProcessor which when the route is 
> stopped it shuts down it's `producerCache` instance.
> {code}
>     protected void doStop() throws Exception {
>         ServiceHelper.stopServices(producerCache, producer);
>     }
> {code}
> this in turn calls `stopAndShutdownService(pool)` which will call stop on the 
> SharedProducerServicePool instance which is a NOOP however it also calls 
> shutdown which effects a stop of the global pool (this stops all the 
> registered services and then clears the pool.
> {code}
>     protected void doStop() throws Exception {
>         // when stopping we intend to shutdown
>         ServiceHelper.stopAndShutdownService(pool);
>         try {
>             ServiceHelper.stopAndShutdownServices(producers.values());
>         } finally {
>             // ensure producers are removed, and also from JMX
>             for (Producer producer : producers.values()) {
>                 getCamelContext().removeService(producer);
>             }
>         }
>         producers.clear();
>     }
> {code}
> However no call to `context.removeService(Producer) is called for the entries 
> from the pool only those singleton instances that were in the `producers` map 
> hence the JMX `ManagedProducer` that is created when `doGetProducer` invokes 
> {code}                getCamelContext().addService(answer, false);
> {code} is never removed. 
> Since the global pool is empty when the next request to get a producer is 
> called a new producer is created, jmx wrapper and all, whilst the old 
> instance remains orphaned retaining any objects that pertain to that instance.
> One workaround is for the producer to call 
> {code}getEndpoint().getCamelContext().removeService(this){code} in it's stop 
> method, however this is fairly obscure and it would probably be better to 
> invoke removal of the producer when it is removed from the shared pool.
> Another issue of note is that when a route is shutdown that contains a 
> SendProcessor due to the shutdown invocation on the 
> SharedProcessorServicePool the global pool is cleared of `everything` and 
> remains in `Stopped` state until another route starts it (although it is 
> still accessed and used whilst in the `Stopped` state).
> h4. Impact
> For general use where there is no dynamic creation or passivation of routes 
> this issue should be minimal, however in our use case where the routes are 
> not static, there is a certain amount of recreation of routes as customer 
> endpoints change and there is a need to passivate idle routes this causes a 
> considerable memory leak (via SFTP in particular).
> h4. Test Case
> {code}
> package org.apache.camel.component;
> import com.google.common.util.concurrent.AtomicLongMap;
> import org.apache.camel.CamelContext;
> import org.apache.camel.Consumer;
> import org.apache.camel.Endpoint;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.Producer;
> import org.apache.camel.Route;
> import org.apache.camel.Service;
> import org.apache.camel.ServicePoolAware;
> import org.apache.camel.ServiceStatus;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultComponent;
> import org.apache.camel.impl.DefaultEndpoint;
> import org.apache.camel.impl.DefaultProducer;
> import org.apache.camel.support.LifecycleStrategySupport;
> import org.apache.camel.support.ServiceSupport;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.junit.Test;
> import java.util.Map;
> import static com.google.common.base.Preconditions.checkNotNull;
> /**
>  * Test memory behaviour of producers using {@link ServicePoolAware} when 
> using JMX.
>  */
> public class ServicePoolAwareLeakyTest extends CamelTestSupport {
>   private static final String LEAKY_SIEVE_STABLE = 
> "leaky://sieve-stable?plugged=true";
>   private static final String LEAKY_SIEVE_TRANSIENT = 
> "leaky://sieve-transient?plugged=true";
>   private static boolean isPatchApplied() {
>     return Boolean.parseBoolean(System.getProperty("patchApplied", "false"));
>   }
>   /**
>    * Component that provides leaks producers.
>    */
>   private static class LeakySieveComponent extends DefaultComponent {
>     @Override
>     protected Endpoint createEndpoint(String uri, String remaining, 
> Map<String, Object> parameters) throws Exception {
>       boolean plugged = "true".equalsIgnoreCase((String) 
> parameters.remove("plugged"));
>       return new LeakySieveEndpoint(uri, isPatchApplied() && plugged);
>     }
>   }
>   /**
>    * Endpoint that provides leaky producers.
>    */
>   private static class LeakySieveEndpoint extends DefaultEndpoint {
>     private final String uri;
>     private final boolean plugged;
>     public LeakySieveEndpoint(String uri, boolean plugged) {
>       this.uri = checkNotNull(uri, "uri must not be null");
>       this.plugged = plugged;
>     }
>     @Override
>     public Producer createProducer() throws Exception {
>       return new LeakySieveProducer(this, plugged);
>     }
>     @Override
>     public Consumer createConsumer(Processor processor) throws Exception {
>       throw new UnsupportedOperationException();
>     }
>     @Override
>     public boolean isSingleton() {
>       return true;
>     }
>     @Override
>     protected String createEndpointUri() {
>       return uri;
>     }
>   }
>   /**
>    * Leaky producer - implements {@link ServicePoolAware}.
>    */
>   private static class LeakySieveProducer extends DefaultProducer implements 
> ServicePoolAware {
>     private final boolean plugged;
>     public LeakySieveProducer(Endpoint endpoint, boolean plugged) {
>       super(endpoint);
>       this.plugged = plugged;
>     }
>     @Override
>     public void process(Exchange exchange) throws Exception {
>       // do nothing
>     }
>     @Override
>     protected void doStop() throws Exception {
>       super.doStop();
>       //noinspection ConstantConditions
>       if (plugged) {
>         // need to remove self from services since we are ServicePoolAware 
> this will not be handled for us otherwise we
>         // leak memory
>         getEndpoint().getCamelContext().removeService(this);
>       }
>     }
>   }
>   @Override
>   protected boolean useJmx() {
>     // only occurs when using JMX as the GC root for the producer is through 
> a ManagedProducer created by the
>     // context.addService() invocation
>     return true;
>   }
>   /**
>    * Returns true if verification of state should be performed during the 
> test as opposed to at the end.
>    */
>   public boolean isFailFast() {
>     return false;
>   }
>   /**
>    * Returns true if during fast failure we should verify that the service 
> pool remains in the started state.
>    */
>   public boolean isVerifyProducerServicePoolRemainsStarted() {
>     return false;
>   }
>   @Override
>   public boolean isUseAdviceWith() {
>     return true;
>   }
>   @Test
>   public void testForMemoryLeak() throws Exception {
>     registerLeakyComponent();
>     final AtomicLongMap<String> references = AtomicLongMap.create();
>     // track LeakySieveProducer lifecycle
>     context.addLifecycleStrategy(new LifecycleStrategySupport() {
>       @Override
>       public void onServiceAdd(CamelContext context, Service service, Route 
> route) {
>         if (service instanceof LeakySieveProducer) {
>           references.incrementAndGet(((LeakySieveProducer) 
> service).getEndpoint().getEndpointKey());
>         }
>       }
>       @Override
>       public void onServiceRemove(CamelContext context, Service service, 
> Route route) {
>         if (service instanceof LeakySieveProducer) {
>           references.decrementAndGet(((LeakySieveProducer) 
> service).getEndpoint().getEndpointKey());
>         }
>       }
>     });
>     context.addRoutes(new RouteBuilder() {
>       @Override
>       public void configure() throws Exception {
>         from("direct:sieve-transient")
>             .id("sieve-transient")
>             .to(LEAKY_SIEVE_TRANSIENT);
>         from("direct:sieve-stable")
>             .id("sieve-stable")
>             .to(LEAKY_SIEVE_STABLE);
>       }
>     });
>     context.start();
>     for (int i = 0; i < 1000; i++) {
>       ServiceSupport service = (ServiceSupport) 
> context.getProducerServicePool();
>       assertEquals(ServiceStatus.Started, service.getStatus());
>       if (isFailFast()) {
>         assertEquals(2, context.getProducerServicePool().size());
>         assertEquals(1, references.get(LEAKY_SIEVE_TRANSIENT));
>         assertEquals(1, references.get(LEAKY_SIEVE_STABLE));
>       }
>       context.stopRoute("sieve-transient");
>       if (isFailFast()) {
>         assertEquals("Expected no service references to remain", 0, 
> references.get(LEAKY_SIEVE_TRANSIENT));
>       }
>       if (isFailFast()) {
>         // looks like we cleared more than just our route, we've stopped and 
> cleared the global ProducerServicePool
>         // since SendProcessor.stop() invokes 
> ServiceHelper.stopServices(producerCache, producer); which in turn invokes
>         // ServiceHelper.stopAndShutdownService(pool);.
>         //
>         // Whilst stop on the SharedProducerServicePool is a NOOP shutdown is 
> not and effects a stop of the pool.
>         if (isVerifyProducerServicePoolRemainsStarted()) {
>          assertEquals(ServiceStatus.Started, service.getStatus());
>         }
>         assertEquals("Expected one stable producer to remain pooled", 1, 
> context.getProducerServicePool().size());
>         assertEquals("Expected one stable producer to remain as service", 1, 
> references.get(LEAKY_SIEVE_STABLE));
>       }
>       // Send a body to verify behaviour of send producer after another route 
> has been stopped
>       sendBody("direct:sieve-stable", "");
>       if (isFailFast()) {
>         // shared pool is used despite being 'Stopped'
>         if (isVerifyProducerServicePoolRemainsStarted()) {
>           assertEquals(ServiceStatus.Started, service.getStatus());
>         }
>         assertEquals("Expected only stable producer in pool", 1, 
> context.getProducerServicePool().size());
>         assertEquals("Expected no references to transient producer", 0, 
> references.get(LEAKY_SIEVE_TRANSIENT));
>         assertEquals("Expected reference to stable producer", 1, 
> references.get(LEAKY_SIEVE_STABLE));
>       }
>       context.startRoute("sieve-transient");
>       // ok, back to normal
>       assertEquals(ServiceStatus.Started, service.getStatus());
>       if (isFailFast()) {
>         assertEquals("Expected both producers in pool", 2, 
> context.getProducerServicePool().size());
>         assertEquals("Expected one transient producer as service", 1, 
> references.get(LEAKY_SIEVE_TRANSIENT));
>         assertEquals("Expected one stable producer as service", 1, 
> references.get(LEAKY_SIEVE_STABLE));
>       }
>     }
>     if (!isFailFast()) {
>       assertEquals("Expected both producers in pool", 2, 
> context.getProducerServicePool().size());
>       // if not fixed these will equal the number of iterations in the loop + 
> 1
>       assertEquals("Expected one transient producer as service", 1, 
> references.get(LEAKY_SIEVE_TRANSIENT));
>       assertEquals("Expected one stable producer as service", 1, 
> references.get(LEAKY_SIEVE_STABLE));
>     }
>   }
>   private void registerLeakyComponent() {
>     // register leaky component
>     context.addComponent("leaky", new LeakySieveComponent());
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to