[ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17680003#comment-17680003
 ] 

Terry Beard commented on KAFKA-14565:
-------------------------------------

[~ChrisEgerton]  I'm a noob when it comes to participating in open source 
projects and as such any advice on the process/goals are most welcomed! 

> Improve Interceptor Resource Leakage Prevention
> -----------------------------------------------
>
>                 Key: KAFKA-14565
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14565
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Terry Beard
>            Assignee: Terry Beard
>            Priority: Major
>              Labels: needs-kip
>             Fix For: 3.5.0
>
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> *AbstractConfig.getConfiguredInstances()*  is delegated responsibility for 
> both creating and configuring each interceptor listed in the 
> interceptor.classes property and returns a configured  
> *List<ConsumerInterceptor<K,V>>* interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception produces a resource leakage in the 
> first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> two approaches:
> +*PROPOSAL 1*+
> Define a default *open* or *configureWithResources()* or *acquireResources()* 
>  method with no implementation and check exception on the respective 
> Consumer/Producer interceptor interfaces.  This method as a part the 
> interceptor life cycle management will be responsible for creating threads 
> and/or objects which utilizes threads, connections or other resource which 
> requires clean up.  Additionally, this default method enables implementation 
> optionality as it's empty default behavior means it will do nothing when 
> unimplemented mitigating backwards compatibility impact to exiting 
> interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers 
> will implement a corresponding *maybeOpen* or *maybeConfigureWithResources* 
> or *maybeAcquireResources* method which also throws a checked exception. 
> See below code excerpt for the Consumer/Producer constructor:
> {code:java}
> List<ConsumerInterceptor<K, V>> interceptorList = (List) 
> config.getConfiguredInstances(
>         ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ConsumerInterceptor.class,
>         Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
> this.interceptors = new ConsumerInterceptors<>(interceptorList);
> this.interceptors.maybeConfigureWithResources();
>  {code}
> +*PROPOSAL 2*+
> To avoid changing a public interface and the subsequent KIP process we can 
> add a new method on the *AbstractConfig* class called 
> *getConfiguredInstanceResult()* which returns a 
> {*}ConfiguredInstanceResult<{*}{*}ConsumerInterceptor<K, V>>{*} object.  A 
> call to *ConfiguredInstanceResult.getConfiguredInstances()* returns an 
> optional list of interceptors. 
> If an exception occurs during configuration of one or more interceptors, the 
> *AbstractConfig.getConfiguredInstanceResult()* will abort, capture the 
> exception while maintaining an optional list of previously created 
> interceptors.  A call to *ConfiguredInstanceResult.getException()* returns an 
> optional exception at which time the caller can determine how to handle e.g. 
> perform clean up.  However, if the caller wants to rethrow the exception 
> where existing catch logic can handle, it can call 
> *ConfiguredInstanceResult.maybeThrowWhenAnyConfigurationFails().*  This 
> method will do nothing when there is no exception. 
> See below code excerpt example for the Consumer/Producer constructor: 
>  
> {code:java}
> ConfiguredInstanceResult<ConsumerInterceptor<K, V>> configuredInstanceResult 
> = (ConfiguredInstanceResult) config.getConfiguredInstanceResult(
>                     ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
>                     ConsumerInterceptor.class,
>                     Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
> clientId));
> List<ConsumerInterceptor<K, V>> interceptorList = 
> configuredInstanceResult.getInstances().orElse(null);
> configuredInstanceResult.throwWhenAnyConfigurationFails();{code}
>  In terms of impact to the developers, one could argue coding may be required 
> in both approaches to get the full benefit of either.  However, I'm open to 
> either approach or another approach outside of these.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to