[ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:
--------------------------------
    Description: 
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List<ConsumerInterceptor<K,V>> interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open or configureWithResources() or acquireResources()  
method with no implementation and check exception on the respective 
Consumer/Producer interceptor interfaces.  This method will be responsible for 
creating threads and/or objects which utilizes threads, connections or other 
resource which requires clean up.  Additionally, the default method enables 
implementation optionality as it's empty default behavior means it will do 
nothing when unimplemented mitigating backwards compatibility impact to exiting 
interceptors.

Additionally, the Kafka Consumer/Producer Interceptor containers will implement 
a corresponding maybeOpen or maybeConfigureWithResources or 
maybeAcquireResources method which also throws a checked exception.  

Alternatively, to avoid changing a public interfaces and the subsequent KIP 
process, the AbstractConfig.getConfiguredInstances()  can be replaced with a 
new factory which wraps the AbstractConfig while introducing a new factory 
method which addresses the problem.  

 

 

  was:
The Consumer and Producer interceptor interfaces and their corresponding Kafka 
Consumer and Producer constructors do not adequately support cleanup of 
underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the 
AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
creating and configuring each interceptor listed in the interceptor.classes 
property and returns a configured  List<ConsumerInterceptor<K,V>> interceptors.

This dual responsibility for both creation and configuration is problematic 
when it involves multiple interceptors where at least one interceptor's 
configure method implementation creates and/or depends on objects which creates 
threads, connections or other resources which requires clean up and the 
subsequent interceptor's configure method raises a runtime exception.  This 
raising of the runtime exception produces a resource leakage in the first 
interceptor as the interceptor container i.e. 
ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
first interceptor's and really any interceptor's close method are never called. 
 

To help ensure the respective container interceptors are able to invoke their 
respective interceptor close methods for proper resource clean up, I propose 
defining a default open method with no implementation and check exception on 
the respective Consumer/Producer interceptor interfaces.  This open method will 
be responsible for creating threads and/or objects which utilizes threads, 
connections or other resource which requires clean up.  Additionally, the 
default open method enables implementation optionality as it's empty default 
behavior means it will do nothing when unimplemented.  

Additionally, the Kafka Consumer/Producer Interceptor containers will implement 
a corresponding maybeOpen method which throws a checked exception.  In order to 
maintain backwards compatibility with earlier developed interceptors the 
maybeOpen will check whether the interceptor's interface contains the newer 
open method before calling it accordingly.   


> Improve Interceptor Resource Leakage Prevention
> -----------------------------------------------
>
>                 Key: KAFKA-14565
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14565
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Terry Beard
>            Assignee: Terry Beard
>            Priority: Major
>              Labels: needs-kip
>             Fix For: 3.5.0
>
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> AbstractConfig.getConfiguredInstances()  is delegated responsibility for both 
> creating and configuring each interceptor listed in the interceptor.classes 
> property and returns a configured  List<ConsumerInterceptor<K,V>> 
> interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception produces a resource leakage in the 
> first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> defining a default open or configureWithResources() or acquireResources()  
> method with no implementation and check exception on the respective 
> Consumer/Producer interceptor interfaces.  This method will be responsible 
> for creating threads and/or objects which utilizes threads, connections or 
> other resource which requires clean up.  Additionally, the default method 
> enables implementation optionality as it's empty default behavior means it 
> will do nothing when unimplemented mitigating backwards compatibility impact 
> to exiting interceptors.
> Additionally, the Kafka Consumer/Producer Interceptor containers will 
> implement a corresponding maybeOpen or maybeConfigureWithResources or 
> maybeAcquireResources method which also throws a checked exception.  
> Alternatively, to avoid changing a public interfaces and the subsequent KIP 
> process, the AbstractConfig.getConfiguredInstances()  can be replaced with a 
> new factory which wraps the AbstractConfig while introducing a new factory 
> method which addresses the problem.  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to