[ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:
--------------------------------
    External issue URL:   (was: https://github.com/apache/kafka/pull/13168)

> Interceptor Resource Leak
> -------------------------
>
>                 Key: KAFKA-14565
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14565
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Terry Beard
>            Assignee: Terry Beard
>            Priority: Major
>             Fix For: 3.5.0
>
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> *AbstractConfig.getConfiguredInstances()*  is delegated responsibility for 
> both creating and configuring each interceptor listed in the 
> interceptor.classes property and returns a configured  
> *List<ConsumerInterceptor<K,V>>* interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception produces a resource leakage in the 
> first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> two approaches:
> +*PROPOSAL 1*+
> Define a default *open* or *configureWithResources()* or *acquireResources()* 
>  method with no implementation and check exception on the respective 
> Consumer/Producer interceptor interfaces.  This method as a part the 
> interceptor life cycle management will be responsible for creating threads 
> and/or objects which utilizes threads, connections or other resource which 
> requires clean up.  Additionally, this default method enables implementation 
> optionality as it's empty default behavior means it will do nothing when 
> unimplemented mitigating backwards compatibility impact to exiting 
> interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers 
> will implement a corresponding *maybeOpen* or *maybeConfigureWithResources* 
> or *maybeAcquireResources* method which also throws a checked exception. 
> See below code excerpt for the Consumer/Producer constructor:
> {code:java}
> List<ConsumerInterceptor<K, V>> interceptorList = (List) 
> config.getConfiguredInstances(
>         ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ConsumerInterceptor.class,
>         Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
> this.interceptors = new ConsumerInterceptors<>(interceptorList);
> this.interceptors.maybeConfigureWithResources();
>  {code}
> +*PROPOSAL 2*+
> To avoid changing any public interfaces and the subsequent KIP process, we can
>  * Create a class which inherits or wraps AbstractConfig that contains a new 
> method which will return a ConfiguredInstanceResult class.  This 
> ConfiguredInstanceResult  class will contain an optional list of successfully 
> created interceptors and/or exception which occurred while calling each 
> Interceptor::configure.  Additionally, it will contain a helper method to 
> rethrow an exception as well as a method which returns the underlying 
> exception.  The caller is expected to handle the exception and perform clean 
> up e.g. call  Interceptor::close  on each interceptor in the list provided by 
> the ConfiguredInstanceResult class.
>  * Automatically invoke {{close}} on any {{Closeable}} or {{AutoCloseable}} 
> instances if/when a failure occurs
>  * Add a new overloaded {{getConfiguredInstance}} / 
> {{getConfiguredInstances}} variant that allows users to specify whether 
> already-instantiated classes should be closed or not when a failure occurs
>  * Add a new exception type to the public API that includes a list of all of 
> the successfully-instantiated (and/or successfully-configured) instances 
> before the error was encountered so that callers can choose how to handle the 
> failure however they want (and possibly so that instantiation/configuration 
> can be attempted on every class before throwing the exception)
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to