[ 
https://issues.apache.org/jira/browse/NIFI-7034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17018078#comment-17018078
 ] 

Tamas Palfy commented on NIFI-7034:
-----------------------------------

To be honest I'm not sure why we need to do the worker recreation here at all. 
I think it would be better to destroy the worker if there is any kind of issue 
and let the next call create a new one as needed at

{code:java}
    @Override
    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
        T worker = workerPool.poll();
        if (worker == null) {
            worker = buildTargetResource(context);
        }
{code}

Maybe the {{resetConnectionFactory}} was intended to do the cleanup, but 
actually it won't do anything because the passed {{ConnectionFactory}} is a 
wrapper to the one within the {{cfProvider}} (and they need to  be _identical_ 
for the reset to do anything). Even more, if it _did_ do the reset, it would be 
even worse as all the reset does is set the reference to null. Then it would 
stay as null until the services is enabled/disabled which would lead to an 
immediate failure of creating the worker anew.

> Connection leak with JMSConsumer and JMSPublisher
> -------------------------------------------------
>
>                 Key: NIFI-7034
>                 URL: https://issues.apache.org/jira/browse/NIFI-7034
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 1.10.0
>         Environment: Discovered against ActiveMQ.
>            Reporter: Gardella Juan Pablo
>            Assignee: Gardella Juan Pablo
>            Priority: Critical
>   Original Estimate: 2h
>  Remaining Estimate: 2h
>
> JMS connections are not closed in case of a failure. Discovered against 
> ActiveMQ, but it applies to other JMS servers. 
> The problem happens when an exception is raised and the worker is marked as 
> invalid. The current code discards the worker before closing it properly. 
> Below the details.
> h3. Details
> Any exception happening to a ConsumerJMS or PublisherJMS marks the worker as 
> invalid. After that, the worker is discarded (the worker object reference is 
> never cleaned). Below the snipped code of the issue:
> {code:java|title=AbstractJMSProcessor}
>         } finally {
>             //in case of exception during worker's connection (consumer or 
> publisher),
>             //an appropriate service is responsible to invalidate the worker.
>             //if worker is not valid anymore, don't put it back into a pool, 
> try to rebuild it first, or discard.
>             //this will be helpful in a situation, when JNDI has changed, or 
> JMS server is not available
>             //and reconnection is required.
>             if (worker == null || !worker.isValid()){
>                 getLogger().debug("Worker is invalid. Will try re-create... 
> ");
>                 final JMSConnectionFactoryProviderDefinition cfProvider = 
> context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
>                 try {
>                     // Safe to cast. Method 
> #buildTargetResource(ProcessContext context) sets only 
> CachingConnectionFactory
>                     CachingConnectionFactory currentCF = 
> (CachingConnectionFactory)worker.jmsTemplate.getConnectionFactory();
>                     
> cfProvider.resetConnectionFactory(currentCF.getTargetConnectionFactory());
>                     worker = buildTargetResource(context);
>                 }catch(Exception e) {
>                     getLogger().error("Failed to rebuild:  " + cfProvider);
>                     worker = null;
>                 }
>             }
> {code}
> Before discard the worker, it should be cleaned all resources associated with 
> it. The proper solution is to call {{worker.shutdown()}} and then discard it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to