[ https://issues.apache.org/jira/browse/CAMEL-9143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14791797#comment-14791797 ]
Claus Ibsen commented on CAMEL-9143: ------------------------------------ Though singleton endpoints/producer is possible with servicepoolaware also though its not so common. I got a fix and with the test case there is no mbean leaks anymore. > Producers that implement the ServicePoolAware interface cause memory leak due > to JMX references > ----------------------------------------------------------------------------------------------- > > Key: CAMEL-9143 > URL: https://issues.apache.org/jira/browse/CAMEL-9143 > Project: Camel > Issue Type: Bug > Components: camel-core > Affects Versions: 2.14.1, 2.14.2, 2.15.0, 2.15.1 > Reporter: Bob Browning > Assignee: Claus Ibsen > Fix For: 2.16.0, 2.15.4 > > > h4. Description > Producer instances that implement the ServicePoolAware interface will leak > memory if their route is stopped, with new producers being leaked every time > the route is started/stopped. > Known implementations that are affected are RemoteFileProducer (ftp, sftp) > and Mina2Producer. > This is due to the behaviour that the SendProcessor which when the route is > stopped it shuts down it's `producerCache` instance. > {code} > protected void doStop() throws Exception { > ServiceHelper.stopServices(producerCache, producer); > } > {code} > this in turn calls `stopAndShutdownService(pool)` which will call stop on the > SharedProducerServicePool instance which is a NOOP however it also calls > shutdown which effects a stop of the global pool (this stops all the > registered services and then clears the pool. > {code} > protected void doStop() throws Exception { > // when stopping we intend to shutdown > ServiceHelper.stopAndShutdownService(pool); > try { > ServiceHelper.stopAndShutdownServices(producers.values()); > } finally { > // ensure producers are removed, and also from JMX > for (Producer producer : producers.values()) { > getCamelContext().removeService(producer); > } > } > producers.clear(); > } > {code} > However no call to `context.removeService(Producer) is called for the entries > from the pool only those singleton instances that were in the `producers` map > hence the JMX `ManagedProducer` that is created when `doGetProducer` invokes > {code} getCamelContext().addService(answer, false); > {code} is never removed. > Since the global pool is empty when the next request to get a producer is > called a new producer is created, jmx wrapper and all, whilst the old > instance remains orphaned retaining any objects that pertain to that instance. > One workaround is for the producer to call > {code}getEndpoint().getCamelContext().removeService(this){code} in it's stop > method, however this is fairly obscure and it would probably be better to > invoke removal of the producer when it is removed from the shared pool. > Another issue of note is that when a route is shutdown that contains a > SendProcessor due to the shutdown invocation on the > SharedProcessorServicePool the global pool is cleared of `everything` and > remains in `Stopped` state until another route starts it (although it is > still accessed and used whilst in the `Stopped` state). > h4. Impact > For general use where there is no dynamic creation or passivation of routes > this issue should be minimal, however in our use case where the routes are > not static, there is a certain amount of recreation of routes as customer > endpoints change and there is a need to passivate idle routes this causes a > considerable memory leak (via SFTP in particular). > h4. Test Case > {code} > package org.apache.camel.component; > import com.google.common.util.concurrent.AtomicLongMap; > import org.apache.camel.CamelContext; > import org.apache.camel.Consumer; > import org.apache.camel.Endpoint; > import org.apache.camel.Exchange; > import org.apache.camel.Processor; > import org.apache.camel.Producer; > import org.apache.camel.Route; > import org.apache.camel.Service; > import org.apache.camel.ServicePoolAware; > import org.apache.camel.ServiceStatus; > import org.apache.camel.builder.RouteBuilder; > import org.apache.camel.impl.DefaultComponent; > import org.apache.camel.impl.DefaultEndpoint; > import org.apache.camel.impl.DefaultProducer; > import org.apache.camel.support.LifecycleStrategySupport; > import org.apache.camel.support.ServiceSupport; > import org.apache.camel.test.junit4.CamelTestSupport; > import org.junit.Test; > import java.util.Map; > import static com.google.common.base.Preconditions.checkNotNull; > /** > * Test memory behaviour of producers using {@link ServicePoolAware} when > using JMX. > */ > public class ServicePoolAwareLeakyTest extends CamelTestSupport { > private static final String LEAKY_SIEVE_STABLE = > "leaky://sieve-stable?plugged=true"; > private static final String LEAKY_SIEVE_TRANSIENT = > "leaky://sieve-transient?plugged=true"; > private static boolean isPatchApplied() { > return Boolean.parseBoolean(System.getProperty("patchApplied", "false")); > } > /** > * Component that provides leaks producers. > */ > private static class LeakySieveComponent extends DefaultComponent { > @Override > protected Endpoint createEndpoint(String uri, String remaining, > Map<String, Object> parameters) throws Exception { > boolean plugged = "true".equalsIgnoreCase((String) > parameters.remove("plugged")); > return new LeakySieveEndpoint(uri, isPatchApplied() && plugged); > } > } > /** > * Endpoint that provides leaky producers. > */ > private static class LeakySieveEndpoint extends DefaultEndpoint { > private final String uri; > private final boolean plugged; > public LeakySieveEndpoint(String uri, boolean plugged) { > this.uri = checkNotNull(uri, "uri must not be null"); > this.plugged = plugged; > } > @Override > public Producer createProducer() throws Exception { > return new LeakySieveProducer(this, plugged); > } > @Override > public Consumer createConsumer(Processor processor) throws Exception { > throw new UnsupportedOperationException(); > } > @Override > public boolean isSingleton() { > return true; > } > @Override > protected String createEndpointUri() { > return uri; > } > } > /** > * Leaky producer - implements {@link ServicePoolAware}. > */ > private static class LeakySieveProducer extends DefaultProducer implements > ServicePoolAware { > private final boolean plugged; > public LeakySieveProducer(Endpoint endpoint, boolean plugged) { > super(endpoint); > this.plugged = plugged; > } > @Override > public void process(Exchange exchange) throws Exception { > // do nothing > } > @Override > protected void doStop() throws Exception { > super.doStop(); > //noinspection ConstantConditions > if (plugged) { > // need to remove self from services since we are ServicePoolAware > this will not be handled for us otherwise we > // leak memory > getEndpoint().getCamelContext().removeService(this); > } > } > } > @Override > protected boolean useJmx() { > // only occurs when using JMX as the GC root for the producer is through > a ManagedProducer created by the > // context.addService() invocation > return true; > } > /** > * Returns true if verification of state should be performed during the > test as opposed to at the end. > */ > public boolean isFailFast() { > return false; > } > /** > * Returns true if during fast failure we should verify that the service > pool remains in the started state. > */ > public boolean isVerifyProducerServicePoolRemainsStarted() { > return false; > } > @Override > public boolean isUseAdviceWith() { > return true; > } > @Test > public void testForMemoryLeak() throws Exception { > registerLeakyComponent(); > final AtomicLongMap<String> references = AtomicLongMap.create(); > // track LeakySieveProducer lifecycle > context.addLifecycleStrategy(new LifecycleStrategySupport() { > @Override > public void onServiceAdd(CamelContext context, Service service, Route > route) { > if (service instanceof LeakySieveProducer) { > references.incrementAndGet(((LeakySieveProducer) > service).getEndpoint().getEndpointKey()); > } > } > @Override > public void onServiceRemove(CamelContext context, Service service, > Route route) { > if (service instanceof LeakySieveProducer) { > references.decrementAndGet(((LeakySieveProducer) > service).getEndpoint().getEndpointKey()); > } > } > }); > context.addRoutes(new RouteBuilder() { > @Override > public void configure() throws Exception { > from("direct:sieve-transient") > .id("sieve-transient") > .to(LEAKY_SIEVE_TRANSIENT); > from("direct:sieve-stable") > .id("sieve-stable") > .to(LEAKY_SIEVE_STABLE); > } > }); > context.start(); > for (int i = 0; i < 1000; i++) { > ServiceSupport service = (ServiceSupport) > context.getProducerServicePool(); > assertEquals(ServiceStatus.Started, service.getStatus()); > if (isFailFast()) { > assertEquals(2, context.getProducerServicePool().size()); > assertEquals(1, references.get(LEAKY_SIEVE_TRANSIENT)); > assertEquals(1, references.get(LEAKY_SIEVE_STABLE)); > } > context.stopRoute("sieve-transient"); > if (isFailFast()) { > assertEquals("Expected no service references to remain", 0, > references.get(LEAKY_SIEVE_TRANSIENT)); > } > if (isFailFast()) { > // looks like we cleared more than just our route, we've stopped and > cleared the global ProducerServicePool > // since SendProcessor.stop() invokes > ServiceHelper.stopServices(producerCache, producer); which in turn invokes > // ServiceHelper.stopAndShutdownService(pool);. > // > // Whilst stop on the SharedProducerServicePool is a NOOP shutdown is > not and effects a stop of the pool. > if (isVerifyProducerServicePoolRemainsStarted()) { > assertEquals(ServiceStatus.Started, service.getStatus()); > } > assertEquals("Expected one stable producer to remain pooled", 1, > context.getProducerServicePool().size()); > assertEquals("Expected one stable producer to remain as service", 1, > references.get(LEAKY_SIEVE_STABLE)); > } > // Send a body to verify behaviour of send producer after another route > has been stopped > sendBody("direct:sieve-stable", ""); > if (isFailFast()) { > // shared pool is used despite being 'Stopped' > if (isVerifyProducerServicePoolRemainsStarted()) { > assertEquals(ServiceStatus.Started, service.getStatus()); > } > assertEquals("Expected only stable producer in pool", 1, > context.getProducerServicePool().size()); > assertEquals("Expected no references to transient producer", 0, > references.get(LEAKY_SIEVE_TRANSIENT)); > assertEquals("Expected reference to stable producer", 1, > references.get(LEAKY_SIEVE_STABLE)); > } > context.startRoute("sieve-transient"); > // ok, back to normal > assertEquals(ServiceStatus.Started, service.getStatus()); > if (isFailFast()) { > assertEquals("Expected both producers in pool", 2, > context.getProducerServicePool().size()); > assertEquals("Expected one transient producer as service", 1, > references.get(LEAKY_SIEVE_TRANSIENT)); > assertEquals("Expected one stable producer as service", 1, > references.get(LEAKY_SIEVE_STABLE)); > } > } > if (!isFailFast()) { > assertEquals("Expected both producers in pool", 2, > context.getProducerServicePool().size()); > // if not fixed these will equal the number of iterations in the loop + > 1 > assertEquals("Expected one transient producer as service", 1, > references.get(LEAKY_SIEVE_TRANSIENT)); > assertEquals("Expected one stable producer as service", 1, > references.get(LEAKY_SIEVE_STABLE)); > } > } > private void registerLeakyComponent() { > // register leaky component > context.addComponent("leaky", new LeakySieveComponent()); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)