Bob Browning created CAMEL-9143:
-----------------------------------

             Summary: Producers that implement the ServicePoolAware interface 
cause memory leak due to JMX references
                 Key: CAMEL-9143
                 URL: https://issues.apache.org/jira/browse/CAMEL-9143
             Project: Camel
          Issue Type: Bug
          Components: camel-core
    Affects Versions: 2.15.1, 2.15.0, 2.14.3, 2.14.2, 2.14.1
            Reporter: Bob Browning


h4. Description

Producer instances that implement the ServicePoolAware interface will leak 
memory if their route is stopped, with new producers being leaked every time 
the route is started/stopped.

Known implementations that are affected are RemoteFileProducer (ftp, sftp) and 
Mina2Producer.

This is due to the behaviour that the SendProcessor which when the route is 
stopped it shuts down it's `producerCache` instance.

{code}
    protected void doStop() throws Exception {
        ServiceHelper.stopServices(producerCache, producer);
    }
{code}

this in turn calls `stopAndShutdownService(pool)` which will call stop on the 
SharedProducerServicePool instance which is a NOOP however it also calls 
shutdown which effects a stop of the global pool (this stops all the registered 
services and then clears the pool.

{code}
    protected void doStop() throws Exception {
        // when stopping we intend to shutdown
        ServiceHelper.stopAndShutdownService(pool);
        try {
            ServiceHelper.stopAndShutdownServices(producers.values());
        } finally {
            // ensure producers are removed, and also from JMX
            for (Producer producer : producers.values()) {
                getCamelContext().removeService(producer);
            }
        }
        producers.clear();
    }
{code}

However no call to `context.removeService(Producer) is called for the entries 
from the pool only those singleton instances that were in the `producers` map 
hence the JMX `ManagedProducer` that is created when `doGetProducer` invokes 
{code}                getCamelContext().addService(answer, false);
{code} is never removed. 

Since the global pool is empty when the next request to get a producer is 
called a new producer is created, jmx wrapper and all, whilst the old instance 
remains orphaned retaining any objects that pertain to that instance.

One workaround is for the producer to call 
{code}getEndpoint().getCamelContext().removeService(this){code} in it's stop 
method, however this is fairly obscure and it would probably be better to 
invoke removal of the producer when it is removed from the shared pool.

Another issue of note is that when a route is shutdown that contains a 
SendProcessor due to the shutdown invocation on the SharedProcessorServicePool 
the global pool is cleared of `everything`.

h4. Impact

For general use where there is no dynamic creation or passivation of routes 
this issue should be minimal, however in our use case where the routes are not 
static, there is a certain amount of recreation of routes as customer endpoints 
change and there is a need to passivate idle routes this causes a considerable 
memory leak (via SFTP in particular).

h4. Test Case
{code}
package org.apache.camel.component;

import com.google.common.util.concurrent.AtomicLongMap;

import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.Route;
import org.apache.camel.Service;
import org.apache.camel.ServicePoolAware;
import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultComponent;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.DefaultProducer;
import org.apache.camel.support.LifecycleStrategySupport;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;

import java.util.Map;

import static com.google.common.base.Preconditions.checkNotNull;

/**
 * Test memory behaviour of producers using {@link ServicePoolAware} when using 
JMX.
 */
public class ServicePoolAwareLeakyTest extends CamelTestSupport {

  private static final String LEAKY_SIEVE_STABLE = 
"leaky://sieve-stable?plugged=true";
  private static final String LEAKY_SIEVE_TRANSIENT = 
"leaky://sieve-transient?plugged=true";


  private static boolean isPatchApplied() {
    return Boolean.parseBoolean(System.getProperty("patchApplied", "false"));
  }

  /**
   * Component that provides leaks producers.
   */
  private static class LeakySieveComponent extends DefaultComponent {
    @Override
    protected Endpoint createEndpoint(String uri, String remaining, Map<String, 
Object> parameters) throws Exception {
      boolean plugged = "true".equalsIgnoreCase((String) 
parameters.remove("plugged"));
      return new LeakySieveEndpoint(uri, isPatchApplied() && plugged);
    }
  }

  /**
   * Endpoint that provides leaky producers.
   */
  private static class LeakySieveEndpoint extends DefaultEndpoint {

    private final String uri;
    private final boolean plugged;

    public LeakySieveEndpoint(String uri, boolean plugged) {
      this.uri = checkNotNull(uri, "uri must not be null");
      this.plugged = plugged;
    }

    @Override
    public Producer createProducer() throws Exception {
      return new LeakySieveProducer(this, plugged);
    }

    @Override
    public Consumer createConsumer(Processor processor) throws Exception {
      throw new UnsupportedOperationException();
    }

    @Override
    public boolean isSingleton() {
      return true;
    }

    @Override
    protected String createEndpointUri() {
      return uri;
    }
  }

  /**
   * Leaky producer - implements {@link ServicePoolAware}.
   */
  private static class LeakySieveProducer extends DefaultProducer implements 
ServicePoolAware {

    private final boolean plugged;

    public LeakySieveProducer(Endpoint endpoint, boolean plugged) {
      super(endpoint);
      this.plugged = plugged;
    }

    @Override
    public void process(Exchange exchange) throws Exception {
      // do nothing
    }

    @Override
    protected void doStop() throws Exception {
      super.doStop();

      //noinspection ConstantConditions
      if (plugged) {
        // need to remove self from services since we are ServicePoolAware this 
will not be handled for us otherwise we
        // leak memory
        getEndpoint().getCamelContext().removeService(this);
      }
    }
  }

  @Override
  protected boolean useJmx() {
    // only occurs when using JMX as the GC root for the producer is through a 
ManagedProducer created by the
    // context.addService() invocation
    return true;
  }

  /**
   * Returns true if verification of state should be performed during the test 
as opposed to at the end.
   */
  public boolean isFailFast() {
    return false;
  }

  /**
   * Returns true if during fast failure we should verify that the service pool 
remains in the started state.
   */
  public boolean isVerifyProducerServicePoolRemainsStarted() {
    return false;
  }

  @Override
  public boolean isUseAdviceWith() {
    return true;
  }

  @Test
  public void testForMemoryLeak() throws Exception {
    registerLeakyComponent();

    final AtomicLongMap<String> references = AtomicLongMap.create();

    // track LeakySieveProducer lifecycle
    context.addLifecycleStrategy(new LifecycleStrategySupport() {
      @Override
      public void onServiceAdd(CamelContext context, Service service, Route 
route) {
        if (service instanceof LeakySieveProducer) {
          references.incrementAndGet(((LeakySieveProducer) 
service).getEndpoint().getEndpointKey());
        }
      }

      @Override
      public void onServiceRemove(CamelContext context, Service service, Route 
route) {
        if (service instanceof LeakySieveProducer) {
          references.decrementAndGet(((LeakySieveProducer) 
service).getEndpoint().getEndpointKey());
        }
      }
    });

    context.addRoutes(new RouteBuilder() {
      @Override
      public void configure() throws Exception {
        from("direct:sieve-transient")
            .id("sieve-transient")
            .to(LEAKY_SIEVE_TRANSIENT);

        from("direct:sieve-stable")
            .id("sieve-stable")
            .to(LEAKY_SIEVE_STABLE);
      }
    });

    context.start();

    for (int i = 0; i < 1000; i++) {
      ServiceSupport service = (ServiceSupport) 
context.getProducerServicePool();
      assertEquals(ServiceStatus.Started, service.getStatus());
      if (isFailFast()) {
        assertEquals(2, context.getProducerServicePool().size());
        assertEquals(1, references.get(LEAKY_SIEVE_TRANSIENT));
        assertEquals(1, references.get(LEAKY_SIEVE_STABLE));
      }

      context.stopRoute("sieve-transient");

      if (isFailFast()) {
        assertEquals("Expected no service references to remain", 0, 
references.get(LEAKY_SIEVE_TRANSIENT));
      }

      if (isFailFast()) {
        // looks like we cleared more than just our route, we've stopped and 
cleared the global ProducerServicePool
        // since SendProcessor.stop() invokes 
ServiceHelper.stopServices(producerCache, producer); which in turn invokes
        // ServiceHelper.stopAndShutdownService(pool);.
        //
        // Whilst stop on the SharedProducerServicePool is a NOOP shutdown is 
not and effects a stop of the pool.

        if (isVerifyProducerServicePoolRemainsStarted()) {
         assertEquals(ServiceStatus.Started, service.getStatus());
        }
        assertEquals("Expected one stable producer to remain pooled", 1, 
context.getProducerServicePool().size());
        assertEquals("Expected one stable producer to remain as service", 1, 
references.get(LEAKY_SIEVE_STABLE));
      }

      // Send a body to verify behaviour of send producer after another route 
has been stopped
      sendBody("direct:sieve-stable", "");

      if (isFailFast()) {
        // shared pool is used despite being 'Stopped'
        if (isVerifyProducerServicePoolRemainsStarted()) {
          assertEquals(ServiceStatus.Started, service.getStatus());
        }

        assertEquals("Expected only stable producer in pool", 1, 
context.getProducerServicePool().size());
        assertEquals("Expected no references to transient producer", 0, 
references.get(LEAKY_SIEVE_TRANSIENT));
        assertEquals("Expected reference to stable producer", 1, 
references.get(LEAKY_SIEVE_STABLE));
      }

      context.startRoute("sieve-transient");

      // ok, back to normal
      assertEquals(ServiceStatus.Started, service.getStatus());
      if (isFailFast()) {
        assertEquals("Expected both producers in pool", 2, 
context.getProducerServicePool().size());
        assertEquals("Expected one transient producer as service", 1, 
references.get(LEAKY_SIEVE_TRANSIENT));
        assertEquals("Expected one stable producer as service", 1, 
references.get(LEAKY_SIEVE_STABLE));
      }
    }

    if (!isFailFast()) {
      assertEquals("Expected both producers in pool", 2, 
context.getProducerServicePool().size());

      // if not fixed these will equal the number of iterations in the loop + 1
      assertEquals("Expected one transient producer as service", 1, 
references.get(LEAKY_SIEVE_TRANSIENT));
      assertEquals("Expected one stable producer as service", 1, 
references.get(LEAKY_SIEVE_STABLE));
    }
  }

  private void registerLeakyComponent() {
    // register leaky component
    context.addComponent("leaky", new LeakySieveComponent());
  }
}
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to